diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index efb4d81e8637d..17694ff2b4592 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -380,7 +380,7 @@ public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { // assign it again, even if that means putting it back on the node on which it previously failed: final String reason = String.format(Locale.ROOT, "[%d] unassigned shards after failing shards", numberOfUnassignedShards); logger.trace("{}, scheduling a reroute", reason); - rerouteService.reroute(reason, ActionListener.wrap( + rerouteService.reroute(reason, Priority.HIGH, ActionListener.wrap( r -> logger.trace("{}, reroute completed", reason), e -> logger.debug(new ParameterizedMessage("{}, reroute failed", reason), e))); } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java index c5ed468993ff7..e3cf0cb2558b2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.common.Priority; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import java.util.ArrayList; @@ -150,7 +151,7 @@ public ClusterTasksResult execute(ClusterState currentState, List jo results.success(joinTask); } if (nodesChanged) { - rerouteService.reroute("post-join reroute", ActionListener.wrap( + rerouteService.reroute("post-join reroute", Priority.HIGH, ActionListener.wrap( r -> logger.trace("post-join reroute completed"), e -> logger.debug("post-join reroute failed", e))); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/BatchedRerouteService.java b/server/src/main/java/org/elasticsearch/cluster/routing/BatchedRerouteService.java index 4ed4caadabe49..0e387db5f45ef 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/BatchedRerouteService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/BatchedRerouteService.java @@ -24,7 +24,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.PlainListenableActionFuture; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.NotMasterException; @@ -32,6 +31,8 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; +import java.util.ArrayList; +import java.util.List; import java.util.function.BiFunction; /** @@ -49,7 +50,8 @@ public class BatchedRerouteService implements RerouteService { private final Object mutex = new Object(); @Nullable // null if no reroute is currently pending - private PlainListenableActionFuture pendingRerouteListeners; + private List> pendingRerouteListeners; + private Priority pendingTaskPriority = Priority.LANGUID; /** * @param reroute Function that computes the updated cluster state after it has been rerouted. @@ -63,29 +65,55 @@ public BatchedRerouteService(ClusterService clusterService, BiFunction listener) { - final PlainListenableActionFuture currentListeners; + public final void reroute(String reason, Priority priority, ActionListener listener) { + final List> currentListeners; synchronized (mutex) { if (pendingRerouteListeners != null) { - logger.trace("already has pending reroute, adding [{}] to batch", reason); - pendingRerouteListeners.addListener(listener); - return; + if (priority.sameOrAfter(pendingTaskPriority)) { + logger.trace("already has pending reroute at priority [{}], adding [{}] with priority [{}] to batch", + pendingTaskPriority, reason, priority); + pendingRerouteListeners.add(listener); + return; + } else { + logger.trace("already has pending reroute at priority [{}], promoting batch to [{}] and adding [{}]", + pendingTaskPriority, priority, reason); + currentListeners = new ArrayList<>(1 + pendingRerouteListeners.size()); + currentListeners.add(listener); + currentListeners.addAll(pendingRerouteListeners); + pendingRerouteListeners.clear(); + pendingRerouteListeners = currentListeners; + pendingTaskPriority = priority; + } + } else { + logger.trace("no pending reroute, scheduling reroute [{}] at priority [{}]", reason, priority); + currentListeners = new ArrayList<>(1); + currentListeners.add(listener); + pendingRerouteListeners = currentListeners; + pendingTaskPriority = priority; } - currentListeners = PlainListenableActionFuture.newListenableFuture(); - currentListeners.addListener(listener); - pendingRerouteListeners = currentListeners; } - logger.trace("rerouting [{}]", reason); try { clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")", - new ClusterStateUpdateTask(Priority.HIGH) { + new ClusterStateUpdateTask(priority) { + @Override public ClusterState execute(ClusterState currentState) { + final boolean currentListenersArePending; synchronized (mutex) { - assert pendingRerouteListeners == currentListeners; - pendingRerouteListeners = null; + assert currentListeners.isEmpty() == (pendingRerouteListeners != currentListeners) + : "currentListeners=" + currentListeners + ", pendingRerouteListeners=" + pendingRerouteListeners; + currentListenersArePending = pendingRerouteListeners == currentListeners; + if (currentListenersArePending) { + pendingRerouteListeners = null; + } + } + if (currentListenersArePending) { + logger.trace("performing batched reroute [{}]", reason); + return reroute.apply(currentState, reason); + } else { + logger.trace("batched reroute [{}] was promoted", reason); + return currentState; } - return reroute.apply(currentState, reason); } @Override @@ -95,7 +123,7 @@ public void onNoLongerMaster(String source) { pendingRerouteListeners = null; } } - currentListeners.onFailure(new NotMasterException("delayed reroute [" + reason + "] cancelled")); + ActionListener.onFailure(currentListeners, new NotMasterException("delayed reroute [" + reason + "] cancelled")); // no big deal, the new master will reroute again } @@ -114,22 +142,26 @@ public void onFailure(String source, Exception e) { logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state version [{}]", source, state.version()), e); } - currentListeners.onFailure(new ElasticsearchException("delayed reroute [" + reason + "] failed", e)); + ActionListener.onFailure(currentListeners, + new ElasticsearchException("delayed reroute [" + reason + "] failed", e)); } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - currentListeners.onResponse(null); + ActionListener.onResponse(currentListeners, null); } }); } catch (Exception e) { synchronized (mutex) { - assert pendingRerouteListeners == currentListeners; - pendingRerouteListeners = null; + assert currentListeners.isEmpty() == (pendingRerouteListeners != currentListeners); + if (pendingRerouteListeners == currentListeners) { + pendingRerouteListeners = null; + } } ClusterState state = clusterService.state(); logger.warn(() -> new ParameterizedMessage("failed to reroute routing table, current state:\n{}", state), e); - currentListeners.onFailure(new ElasticsearchException("delayed reroute [" + reason + "] could not be submitted", e)); + ActionListener.onFailure(currentListeners, + new ElasticsearchException("delayed reroute [" + reason + "] could not be submitted", e)); } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RerouteService.java b/server/src/main/java/org/elasticsearch/cluster/routing/RerouteService.java index 11a49322e10fa..58f9e41fe88a7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RerouteService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RerouteService.java @@ -19,11 +19,19 @@ package org.elasticsearch.cluster.routing; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.Priority; /** * Asynchronously performs a cluster reroute, updating any shard states and rebalancing the cluster if appropriate. */ @FunctionalInterface public interface RerouteService { - void reroute(String reason, ActionListener listener); + + /** + * Schedule a cluster reroute. + * @param priority the (minimum) priority at which to run this reroute. If there is already a pending reroute at a higher priority then + * this reroute is batched with the pending one; if there is already a pending reroute at a lower priority then + * the priority of the pending batch is raised to the given priority. + */ + void reroute(String reason, Priority priority, ActionListener listener); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java index 7177cf8bef4be..0e18ca3c0ea35 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -34,6 +34,7 @@ import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.ClusterSettings; @@ -185,7 +186,7 @@ public void onNewInfo(ClusterInfo info) { if (reroute) { logger.info("rerouting shards: [{}]", explanation); - rerouteService.reroute("disk threshold monitor", ActionListener.wrap(r -> { + rerouteService.reroute("disk threshold monitor", Priority.HIGH, ActionListener.wrap(r -> { setLastRunTimeMillis(); listener.onResponse(r); }, e -> { diff --git a/server/src/main/java/org/elasticsearch/common/Priority.java b/server/src/main/java/org/elasticsearch/common/Priority.java index bf7ded585d0a5..545c353e7ecf2 100644 --- a/server/src/main/java/org/elasticsearch/common/Priority.java +++ b/server/src/main/java/org/elasticsearch/common/Priority.java @@ -60,10 +60,18 @@ public static Priority fromByte(byte b) { this.value = value; } + /** + * @return whether tasks of {@code this} priority will run after those of priority {@code p}. + * For instance, {@code Priority.URGENT.after(Priority.IMMEDIATE)} returns {@code true}. + */ public boolean after(Priority p) { return this.compareTo(p) > 0; } + /** + * @return whether tasks of {@code this} priority will run no earlier than those of priority {@code p}. + * For instance, {@code Priority.URGENT.sameOrAfter(Priority.IMMEDIATE)} returns {@code true}. + */ public boolean sameOrAfter(Priority p) { return this.compareTo(p) >= 0; } diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index 108f8bacdd936..e18e51c76a477 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.elasticsearch.cluster.routing.allocation.FailedShard; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -106,6 +107,8 @@ public void applyFailedShards(final RoutingAllocation allocation, final List extends AsyncShardFetch @Override protected void reroute(ShardId shardId, String reason) { logger.trace("{} scheduling reroute for {}", shardId, reason); - rerouteService.reroute("async_shard_fetch", ActionListener.wrap( + assert rerouteService != null; + rerouteService.reroute("async_shard_fetch", Priority.HIGH, ActionListener.wrap( r -> logger.trace("{} scheduled reroute completed for {}", shardId, reason), e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", shardId, reason), e))); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java index 70567da1aadfb..ef7567ea5df91 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java @@ -57,7 +57,7 @@ public void testJoinDeduplication() { x -> localNode, null, Collections.emptySet()); JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> null, (joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); }, - Collections.emptyList(), (s, r) -> {}); + Collections.emptyList(), (s, p, r) -> {}); transportService.start(); DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT); @@ -153,7 +153,7 @@ public void testJoinValidationRejectsMismatchedClusterUUID() { x -> localNode, null, Collections.emptySet()); new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> localClusterState, (joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); }, - Collections.emptyList(), (s, r) -> {}); // registers request handler + Collections.emptyList(), (s, p, r) -> {}); // registers request handler transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index d65156cef0a29..8ac0c0455e5e9 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -174,7 +174,7 @@ transportService, writableRegistry(), () -> new InMemoryPersistedState(term, initialState), r -> emptyList(), new NoOpClusterApplier(), Collections.emptyList(), - random, (s, r) -> {}, ElectionStrategy.DEFAULT_INSTANCE); + random, (s, p, r) -> {}, ElectionStrategy.DEFAULT_INSTANCE); transportService.start(); transportService.acceptIncomingRequests(); transport = capturingTransport; diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/BatchedRerouteServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/BatchedRerouteServiceTests.java index 966ac1e60650d..c29174d055d05 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/BatchedRerouteServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/BatchedRerouteServiceTests.java @@ -25,6 +25,8 @@ import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.Randomness; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -32,12 +34,16 @@ import org.junit.After; import org.junit.Before; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import static org.hamcrest.Matchers.lessThan; @@ -70,13 +76,14 @@ public void testReroutesWhenRequested() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(iterations); for (int i = 0; i < iterations; i++) { rerouteCountBeforeReroute = Math.max(rerouteCountBeforeReroute, rerouteCount.get()); - batchedRerouteService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown)); + batchedRerouteService.reroute("iteration " + i, randomFrom(EnumSet.allOf(Priority.class)), + ActionListener.wrap(countDownLatch::countDown)); } countDownLatch.await(10, TimeUnit.SECONDS); assertThat(rerouteCountBeforeReroute, lessThan(rerouteCount.get())); } - public void testBatchesReroutesTogether() throws BrokenBarrierException, InterruptedException { + public void testBatchesReroutesTogetherAtPriorityOfHighestSubmittedReroute() throws BrokenBarrierException, InterruptedException { final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); clusterService.submitStateUpdateTask("block master service", new ClusterStateUpdateTask() { @Override @@ -100,14 +107,77 @@ public void onFailure(String source, Exception e) { return s; }); - final int iterations = between(1, 100); - final CountDownLatch countDownLatch = new CountDownLatch(iterations); - for (int i = 0; i < iterations; i++) { - batchedRerouteService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown)); + final int iterations = scaledRandomIntBetween(1, 100); + final CountDownLatch tasksSubmittedCountDown = new CountDownLatch(iterations); + final CountDownLatch tasksCompletedCountDown = new CountDownLatch(iterations); + final List actions = new ArrayList<>(iterations); + final Function rerouteFromPriority = priority -> () -> { + final AtomicBoolean alreadyRun = new AtomicBoolean(); + batchedRerouteService.reroute("reroute at " + priority, priority, ActionListener.wrap(() -> { + assertTrue(alreadyRun.compareAndSet(false, true)); + tasksCompletedCountDown.countDown(); + })); + tasksSubmittedCountDown.countDown(); + }; + actions.add(rerouteFromPriority.apply(Priority.URGENT)); // ensure at least one URGENT priority reroute + for (int i = 1; i < iterations; i++) { + final int iteration = i; + if (randomBoolean()) { + actions.add(rerouteFromPriority.apply(randomFrom(Priority.LOW, Priority.NORMAL, Priority.HIGH, Priority.URGENT))); + } else { + final Priority priority = randomFrom(Priority.NORMAL, Priority.HIGH, Priority.URGENT, Priority.IMMEDIATE); + final boolean submittedConcurrentlyWithReroute = randomBoolean(); + if (submittedConcurrentlyWithReroute == false) { + tasksSubmittedCountDown.countDown(); // this task might be submitted later + } + actions.add(() -> { + clusterService.submitStateUpdateTask("other task " + iteration + " at " + priority, + new ClusterStateUpdateTask(priority) { + + @Override + public ClusterState execute(ClusterState currentState) { + switch (priority) { + case IMMEDIATE: + if (submittedConcurrentlyWithReroute) { + assertFalse("should have rerouted after " + priority + " priority task", rerouteExecuted.get()); + } // else this task might be submitted too late to precede the reroute + break; + case URGENT: + // may run either before or after reroute + break; + case HIGH: + case NORMAL: + assertTrue("should have rerouted before " + priority + " priority task", rerouteExecuted.get()); + break; + default: + fail("unexpected priority: " + priority); + break; + } + return currentState; + } + + @Override + public void onFailure(String source, Exception e) { + throw new AssertionError(source, e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + tasksCompletedCountDown.countDown(); + } + }); + if (submittedConcurrentlyWithReroute) { + tasksSubmittedCountDown.countDown(); + } + }); + } } + Randomness.shuffle(actions); + actions.forEach(threadPool.generic()::execute); + assertTrue(tasksSubmittedCountDown.await(10, TimeUnit.SECONDS)); cyclicBarrier.await(); // allow master thread to continue; - countDownLatch.await(); // wait for reroute to complete + assertTrue(tasksCompletedCountDown.await(10, TimeUnit.SECONDS)); // wait for reroute to complete assertTrue(rerouteExecuted.get()); // see above for assertion that it's only called once } @@ -123,7 +193,19 @@ public void testNotifiesOnFailure() throws InterruptedException { final int iterations = between(1, 100); final CountDownLatch countDownLatch = new CountDownLatch(iterations); for (int i = 0; i < iterations; i++) { - batchedRerouteService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown)); + batchedRerouteService.reroute("iteration " + i, + randomFrom(EnumSet.allOf(Priority.class)), ActionListener.wrap( + r -> { + countDownLatch.countDown(); + if (rarely()) { + throw new ElasticsearchException("failure during notification"); + } + }, e -> { + countDownLatch.countDown(); + if (randomBoolean()) { + throw new ElasticsearchException("failure during failure notification", e); + } + })); if (rarely()) { clusterService.getMasterService().setClusterStatePublisher( randomBoolean() diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index e46c899dbec91..213666b7011a8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -44,6 +45,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.hamcrest.Matchers.equalTo; public class DiskThresholdMonitorTests extends ESAllocationTestCase { @@ -77,10 +79,12 @@ public void testMarkFloodStageIndicesReadOnly() { AtomicReference> indices = new AtomicReference<>(); AtomicLong currentTime = new AtomicLong(); DiskThresholdMonitor monitor = new DiskThresholdMonitor(settings, () -> finalState, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get, (reason, listener) -> { - assertTrue(reroute.compareAndSet(false, true)); - listener.onResponse(null); - }) { + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get, + (reason, priority, listener) -> { + assertTrue(reroute.compareAndSet(false, true)); + assertThat(priority, equalTo(Priority.HIGH)); + listener.onResponse(null); + }) { @Override protected void markIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener) { assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly)); @@ -117,10 +121,12 @@ protected void markIndicesReadOnly(Set indicesToMarkReadOnly, ActionList assertTrue(anotherFinalClusterState.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2")); monitor = new DiskThresholdMonitor(settings, () -> anotherFinalClusterState, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get, (reason, listener) -> { - assertTrue(reroute.compareAndSet(false, true)); - listener.onResponse(null); - }) { + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get, + (reason, priority, listener) -> { + assertTrue(reroute.compareAndSet(false, true)); + assertThat(priority, equalTo(Priority.HIGH)); + listener.onResponse(null); + }) { @Override protected void markIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener) { assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly)); @@ -144,10 +150,12 @@ public void testDoesNotSubmitRerouteTaskTooFrequently() { AtomicLong currentTime = new AtomicLong(); AtomicReference> listenerReference = new AtomicReference<>(); DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterState, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get, (reason, listener) -> { - assertNotNull(listener); - assertTrue(listenerReference.compareAndSet(null, listener)); - }) { + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get, + (reason, priority, listener) -> { + assertNotNull(listener); + assertThat(priority, equalTo(Priority.HIGH)); + assertTrue(listenerReference.compareAndSet(null, listener)); + }) { @Override protected void markIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener) { throw new AssertionError("unexpected"); diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 814119f89f070..6cb56492935dd 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -219,7 +219,7 @@ allocationService, new AliasValidator(), environment, transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver); nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); - joinTaskExecutor = new JoinTaskExecutor(allocationService, logger, (s, r) -> {}); + joinTaskExecutor = new JoinTaskExecutor(allocationService, logger, (s, p, r) -> {}); } public ClusterState createIndex(ClusterState state, CreateIndexRequest request) { diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index 615650250433c..a3dce45daf101 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -887,7 +887,7 @@ protected Optional getDisruptableMockTransport(Transpo final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY); coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(), allocationService, masterService, this::getPersistedState, - Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), (s, r) -> {}, + Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), (s, p, r) -> {}, getElectionStrategy()); masterService.setClusterStatePublisher(coordinator); final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,