From 99562c0497fbfa0c4c86f455ae25ef9fa36cf5b4 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 13 Jun 2018 20:49:50 -0400 Subject: [PATCH 1/7] TEST: getCapturedRequestsAndClear should be atomic We might lose messages between getCapturedRequestsAndClear calls. This commit makes sure that both getCapturedRequestsAndClear and getCapturedRequestsByTargetNodeAndClear are atomic. --- .../test/transport/CapturingTransport.java | 38 +++++++++++-------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java index 81fc934ca6d7e..a7a0eb446e8fb 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -46,6 +46,7 @@ import java.io.UncheckedIOException; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -94,9 +95,23 @@ public CapturedRequest[] capturedRequests() { * @return the captured requests */ public CapturedRequest[] getCapturedRequestsAndClear() { - CapturedRequest[] capturedRequests = capturedRequests(); - clear(); - return capturedRequests; + List requests = new ArrayList<>(capturedRequests.size()); + capturedRequests.drainTo(requests); + return requests.toArray(new CapturedRequest[0]); + } + + private Map> groupRequestsByTargetNode(Collection requests) { + Map> result = new HashMap<>(); + for (CapturedRequest request : requests) { + result.compute(request.node.getId(), (k, group) -> { + if (group == null) { + group = new ArrayList<>(); + } + group.add(request); + return group; + }); + } + return result; } /** @@ -104,16 +119,7 @@ public CapturedRequest[] getCapturedRequestsAndClear() { * Doesn't clear the captured request list. See {@link #clear()} */ public Map> capturedRequestsByTargetNode() { - Map> map = new HashMap<>(); - for (CapturedRequest request : capturedRequests) { - List nodeList = map.get(request.node.getId()); - if (nodeList == null) { - nodeList = new ArrayList<>(); - map.put(request.node.getId(), nodeList); - } - nodeList.add(request); - } - return map; + return groupRequestsByTargetNode(capturedRequests); } /** @@ -125,9 +131,9 @@ public Map> capturedRequestsByTargetNode() { * @return the captured requests grouped by target node */ public Map> getCapturedRequestsByTargetNodeAndClear() { - Map> map = capturedRequestsByTargetNode(); - clear(); - return map; + List requests = new ArrayList<>(capturedRequests.size()); + capturedRequests.drainTo(requests); + return groupRequestsByTargetNode(requests); } /** clears captured requests */ From 251d44ca6d019d1c0a518beeb4a0cd9f91ff55d8 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 13 Jun 2018 19:01:22 -0400 Subject: [PATCH 2/7] Avoid sending duplicate remote failed shard requests Today if a replica fails, we will send a shard failed request for each replication request to the master node until that primary receives the new cluster state. However, if a bulk requests are large and the master node is busy, we might overwhelm the cluster with shard failed requests. This commit tries to minimize the shard failed requests in the above scenario by caching the ongoing requests. This was discussed at https://discuss.elastic.co/t/half-dead-node-lead-to-cluster-hang/113658/25 --- .../action/shard/ShardStateAction.java | 125 ++++++++++++++- ...rdFailedClusterStateTaskExecutorTests.java | 32 ++-- .../action/shard/ShardStateActionTests.java | 147 +++++++++++++++++- 3 files changed, 288 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 915e900b9ddf1..fe4b7d0684560 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -25,10 +25,10 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.MasterNodeChangePredicate; import org.elasticsearch.cluster.NotMasterException; @@ -48,6 +48,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.node.NodeClosedException; @@ -68,7 +69,9 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentMap; import java.util.function.Predicate; public class ShardStateAction extends AbstractComponent { @@ -80,6 +83,10 @@ public class ShardStateAction extends AbstractComponent { private final ClusterService clusterService; private final ThreadPool threadPool; + // a list of shards that failed during replication + // we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard. + private final ConcurrentMap remoteFailedShardsCache = ConcurrentCollections.newConcurrentMap(); + @Inject public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService, AllocationService allocationService, RoutingService routingService, ThreadPool threadPool) { @@ -146,8 +153,35 @@ private static boolean isMasterChannelException(TransportException exp) { */ public void remoteShardFailed(final ShardId shardId, String allocationId, long primaryTerm, boolean markAsStale, final String message, @Nullable final Exception failure, Listener listener) { assert primaryTerm > 0L : "primary term should be strictly positive"; - FailedShardEntry shardEntry = new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale); - sendShardAction(SHARD_FAILED_ACTION_NAME, clusterService.state(), shardEntry, listener); + final FailedShardEntry shardEntry = new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale); + final CompositeListener compositeListener = new CompositeListener(listener); + final CompositeListener existingListener = remoteFailedShardsCache.putIfAbsent(shardEntry, compositeListener); + if (existingListener == null) { + sendShardAction(SHARD_FAILED_ACTION_NAME, clusterService.state(), shardEntry, new Listener() { + @Override + public void onSuccess() { + try { + compositeListener.onSuccess(); + } finally { + remoteFailedShardsCache.remove(shardEntry); + } + } + @Override + public void onFailure(Exception e) { + try { + compositeListener.onFailure(e); + } finally { + remoteFailedShardsCache.remove(shardEntry); + } + } + }); + } else { + existingListener.addListener(listener); + } + } + + int remoteShardFailedCacheSize() { + return remoteFailedShardsCache.size(); } /** @@ -414,6 +448,23 @@ public String toString() { components.add("markAsStale [" + markAsStale + "]"); return String.join(", ", components); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FailedShardEntry that = (FailedShardEntry) o; + // Exclude message and exception from equals and hashCode + return Objects.equals(this.shardId, that.shardId) && + Objects.equals(this.allocationId, that.allocationId) && + primaryTerm == that.primaryTerm && + markAsStale == that.markAsStale; + } + + @Override + public int hashCode() { + return Objects.hash(shardId, allocationId, primaryTerm, markAsStale); + } } public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener) { @@ -585,6 +636,74 @@ default void onFailure(final Exception e) { } + /** + * A composite listener that allows registering multiple listeners dynamically. + */ + static final class CompositeListener implements Listener { + private boolean isNotified = false; + private Exception failure = null; + private final List listeners = new ArrayList<>(); + + CompositeListener(Listener listener) { + listeners.add(listener); + } + + void addListener(Listener listener) { + final boolean ready; + synchronized (this) { + ready = this.isNotified; + if (ready == false) { + listeners.add(listener); + } + } + if (ready) { + if (failure != null) { + listener.onFailure(failure); + } else { + listener.onSuccess(); + } + } + } + + private void onCompleted(Exception failure) { + final List listeners; + synchronized (this) { + this.failure = failure; + listeners = this.listeners; + this.isNotified = true; + } + Exception firstException = null; + for (Listener listener : listeners) { + try { + if (failure != null) { + listener.onFailure(failure); + } else { + listener.onSuccess(); + } + } catch (Exception innerEx) { + if (firstException == null) { + firstException = innerEx; + } else { + firstException.addSuppressed(innerEx); + } + } + } + if (firstException != null) { + throw new ElasticsearchException("failed to notify listener", firstException); + } + } + + @Override + public void onSuccess() { + onCompleted(null); + } + + @Override + public void onFailure(Exception failure) { + onCompleted(failure); + } + } + public static class NoLongerPrimaryShardException extends ElasticsearchException { public NoLongerPrimaryShardException(ShardId shardId, String msg) { diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java index 9eeef54dfd796..02dc581caf15a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java @@ -22,11 +22,11 @@ import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.Version; -import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -52,10 +52,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -131,9 +131,14 @@ ClusterState applyFailedShards(ClusterState currentState, List fail tasks.addAll(failingTasks); tasks.addAll(nonExistentTasks); ClusterStateTaskExecutor.ClusterTasksResult result = failingExecutor.execute(currentState, tasks); - Map taskResultMap = - failingTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.failure(new RuntimeException("simulated applyFailedShards failure")))); - taskResultMap.putAll(nonExistentTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.success()))); + Map taskResultMap = new IdentityHashMap<>(); + for (FailedShardEntry failingTask : failingTasks) { + taskResultMap.put(failingTask, + ClusterStateTaskExecutor.TaskResult.failure(new RuntimeException("simulated applyFailedShards failure"))); + } + for (FailedShardEntry nonExistentTask : nonExistentTasks) { + taskResultMap.put(nonExistentTask, ClusterStateTaskExecutor.TaskResult.success()); + } assertTaskResults(taskResultMap, result, currentState, false); } @@ -147,12 +152,13 @@ public void testIllegalShardFailureRequests() throws Exception { tasks.add(new FailedShardEntry(failingTask.shardId, failingTask.allocationId, randomIntBetween(1, (int) primaryTerm - 1), failingTask.message, failingTask.failure, randomBoolean())); } - Map taskResultMap = - tasks.stream().collect(Collectors.toMap( - Function.identity(), - task -> ClusterStateTaskExecutor.TaskResult.failure(new ShardStateAction.NoLongerPrimaryShardException(task.shardId, + Map taskResultMap = new IdentityHashMap<>(); + for (FailedShardEntry task : tasks) { + taskResultMap.put(task, + ClusterStateTaskExecutor.TaskResult.failure(new ShardStateAction.NoLongerPrimaryShardException(task.shardId, "primary term [" + task.primaryTerm + "] did not match current primary term [" + - currentState.metaData().index(task.shardId.getIndex()).primaryTerm(task.shardId.id()) + "]")))); + currentState.metaData().index(task.shardId.getIndex()).primaryTerm(task.shardId.id()) + "]"))); + } ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(currentState, tasks); assertTaskResults(taskResultMap, result, currentState, false); } @@ -251,8 +257,10 @@ private static void assertTasksSuccessful( ClusterState clusterState, boolean clusterStateChanged ) { - Map taskResultMap = - tasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.success())); + Map taskResultMap = new IdentityHashMap<>(); + for (FailedShardEntry task : tasks) { + taskResultMap.put(task, ClusterStateTaskExecutor.TaskResult.success()); + } assertTaskResults(taskResultMap, result, clusterState, clusterStateChanged); } diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index bbd326ff2fedb..d53c3a64e7e90 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -59,9 +59,10 @@ import org.junit.BeforeClass; import java.io.IOException; -import java.util.UUID; import java.util.Collections; +import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -73,6 +74,8 @@ import static org.elasticsearch.test.ClusterServiceUtils.setState; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; @@ -138,6 +141,7 @@ public void tearDown() throws Exception { clusterService.close(); transportService.close(); super.tearDown(); + assertThat(shardStateAction.remoteShardFailedCacheSize(), equalTo(0)); } @AfterClass @@ -381,6 +385,90 @@ public void onFailure(Exception e) { assertThat(failure.get().getMessage(), equalTo(catastrophicError.getMessage())); } + public void testCacheRemoteShardFailed() throws Exception { + final String index = "test"; + setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); + ShardRouting failedShard = getRandomShardRouting(index); + long primaryTerm = clusterService.state().metaData().index(index).primaryTerm(failedShard.id()); + boolean markAsStale = randomBoolean(); + int numListeners = between(1, 100); + CountDownLatch latch = new CountDownLatch(numListeners); + for (int i = 0; i < numListeners; i++) { + shardStateAction.remoteShardFailed(failedShard.shardId(), failedShard.allocationId().getId(), + primaryTerm + 1, markAsStale, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + @Override + public void onSuccess() { + latch.countDown(); + } + @Override + public void onFailure(Exception e) { + latch.countDown(); + } + }); + } + CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + assertThat(capturedRequests, arrayWithSize(1)); + transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); + latch.await(); + assertThat(transport.capturedRequests(), arrayWithSize(0)); + } + + public void testRemoteShardFailedConcurrently() throws Exception { + final String index = "test"; + final AtomicBoolean shutdown = new AtomicBoolean(false); + setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); + ShardRouting[] failedShards = new ShardRouting[between(1, 5)]; + for (int i = 0; i < failedShards.length; i++) { + failedShards[i] = getRandomShardRouting(index); + } + Thread[] clientThreads = new Thread[between(1, 6)]; + int iterationsPerThread = scaledRandomIntBetween(50, 500); + Phaser barrier = new Phaser(clientThreads.length + 2); // one for master thread, one for the main thread + Thread masterThread = new Thread(() -> { + barrier.arriveAndAwaitAdvance(); + while (shutdown.get() == false) { + for (CapturingTransport.CapturedRequest request : transport.getCapturedRequestsAndClear()) { + if (randomBoolean()) { + transport.handleResponse(request.requestId, TransportResponse.Empty.INSTANCE); + } else { + transport.handleRemoteError(request.requestId, randomFrom(getSimulatedFailure())); + } + } + } + }); + masterThread.start(); + + AtomicInteger notifiedResponses = new AtomicInteger(); + for (int t = 0; t < clientThreads.length; t++) { + long primaryTerm = clusterService.state().metaData().index(index).primaryTerm(failedShards[0].id()); + clientThreads[t] = new Thread(() -> { + barrier.arriveAndAwaitAdvance(); + for (int i = 0; i < iterationsPerThread; i++) { + ShardRouting failedShard = randomFrom(failedShards); + shardStateAction.remoteShardFailed(failedShard.shardId(), failedShard.allocationId().getId(), + primaryTerm + 1, randomBoolean(), "test", getSimulatedFailure(), new ShardStateAction.Listener() { + @Override + public void onSuccess() { + notifiedResponses.incrementAndGet(); + } + @Override + public void onFailure(Exception e) { + notifiedResponses.incrementAndGet(); + } + }); + } + }); + clientThreads[t].start(); + } + barrier.arriveAndAwaitAdvance(); + for (Thread t : clientThreads) { + t.join(); + } + assertBusy(() -> assertThat(notifiedResponses.get(), equalTo(clientThreads.length * iterationsPerThread))); + shutdown.set(true); + masterThread.join(); + } + private ShardRouting getRandomShardRouting(String index) { IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(index); ShardsIterator shardsIterator = indexRoutingTable.randomAllActiveShardsIt(); @@ -452,4 +540,61 @@ BytesReference serialize(Writeable writeable, Version version) throws IOExceptio return out.bytes(); } } + + public void testCompositeListener() throws Exception { + AtomicInteger successCount = new AtomicInteger(); + AtomicInteger failureCount = new AtomicInteger(); + Exception failure = randomBoolean() ? getSimulatedFailure() : null; + ShardStateAction.CompositeListener compositeListener = new ShardStateAction.CompositeListener(new ShardStateAction.Listener() { + @Override + public void onSuccess() { + successCount.incrementAndGet(); + } + @Override + public void onFailure(Exception e) { + assertThat(e, sameInstance(failure)); + failureCount.incrementAndGet(); + } + }); + int iterationsPerThread = scaledRandomIntBetween(100, 1000); + Thread[] threads = new Thread[between(1, 4)]; + Phaser barrier = new Phaser(threads.length + 1); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + barrier.arriveAndAwaitAdvance(); + for (int n = 0; n < iterationsPerThread; n++) { + compositeListener.addListener(new ShardStateAction.Listener() { + @Override + public void onSuccess() { + successCount.incrementAndGet(); + } + @Override + public void onFailure(Exception e) { + assertThat(e, sameInstance(failure)); + failureCount.incrementAndGet(); + } + }); + } + }); + threads[i].start(); + } + barrier.arriveAndAwaitAdvance(); + if (failure != null) { + compositeListener.onFailure(failure); + } else { + compositeListener.onSuccess(); + } + for (Thread t : threads) { + t.join(); + } + assertBusy(() -> { + if (failure != null) { + assertThat(successCount.get(), equalTo(0)); + assertThat(failureCount.get(), equalTo(threads.length*iterationsPerThread + 1)); + } else { + assertThat(successCount.get(), equalTo(threads.length*iterationsPerThread + 1)); + assertThat(failureCount.get(), equalTo(0)); + } + }); + } } From 6a704c7286ca32285b9a400ee47e4b4b92fe0d90 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 14 Jun 2018 09:47:31 -0400 Subject: [PATCH 3/7] Simplify report --- .../cluster/action/shard/ShardStateAction.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index fe4b7d0684560..f690efa4c9a0c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -666,13 +666,11 @@ void addListener(Listener listener) { } private void onCompleted(Exception failure) { - final List listeners; synchronized (this) { this.failure = failure; - listeners = this.listeners; this.isNotified = true; } - Exception firstException = null; + RuntimeException firstException = null; for (Listener listener : listeners) { try { if (failure != null) { @@ -680,7 +678,7 @@ private void onCompleted(Exception failure) { } else { listener.onSuccess(); } - } catch (Exception innerEx) { + } catch (RuntimeException innerEx) { if (firstException == null) { firstException = innerEx; } else { @@ -689,7 +687,7 @@ private void onCompleted(Exception failure) { } } if (firstException != null) { - throw new ElasticsearchException("failed to notify listener", firstException); + throw firstException; } } From 7601a13690667806fbc8b9350ffe286eea9bb0af Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 14 Jun 2018 10:00:57 -0400 Subject: [PATCH 4/7] Remove identity map --- ...rdFailedClusterStateTaskExecutorTests.java | 54 +++++++++---------- 1 file changed, 25 insertions(+), 29 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java index 02dc581caf15a..5bf62faffabcf 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.cluster.routing.allocation.StaleShard; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; @@ -52,9 +53,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.IdentityHashMap; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -131,13 +130,13 @@ ClusterState applyFailedShards(ClusterState currentState, List fail tasks.addAll(failingTasks); tasks.addAll(nonExistentTasks); ClusterStateTaskExecutor.ClusterTasksResult result = failingExecutor.execute(currentState, tasks); - Map taskResultMap = new IdentityHashMap<>(); + List> taskResultMap = new ArrayList<>(); for (FailedShardEntry failingTask : failingTasks) { - taskResultMap.put(failingTask, - ClusterStateTaskExecutor.TaskResult.failure(new RuntimeException("simulated applyFailedShards failure"))); + taskResultMap.add(Tuple.tuple(failingTask, + ClusterStateTaskExecutor.TaskResult.failure(new RuntimeException("simulated applyFailedShards failure")))); } for (FailedShardEntry nonExistentTask : nonExistentTasks) { - taskResultMap.put(nonExistentTask, ClusterStateTaskExecutor.TaskResult.success()); + taskResultMap.add(Tuple.tuple(nonExistentTask, ClusterStateTaskExecutor.TaskResult.success())); } assertTaskResults(taskResultMap, result, currentState, false); } @@ -152,13 +151,12 @@ public void testIllegalShardFailureRequests() throws Exception { tasks.add(new FailedShardEntry(failingTask.shardId, failingTask.allocationId, randomIntBetween(1, (int) primaryTerm - 1), failingTask.message, failingTask.failure, randomBoolean())); } - Map taskResultMap = new IdentityHashMap<>(); - for (FailedShardEntry task : tasks) { - taskResultMap.put(task, - ClusterStateTaskExecutor.TaskResult.failure(new ShardStateAction.NoLongerPrimaryShardException(task.shardId, - "primary term [" + task.primaryTerm + "] did not match current primary term [" + - currentState.metaData().index(task.shardId.getIndex()).primaryTerm(task.shardId.id()) + "]"))); - } + List> taskResultMap = tasks.stream() + .map(task -> Tuple.tuple(task, ClusterStateTaskExecutor.TaskResult.failure( + new ShardStateAction.NoLongerPrimaryShardException(task.shardId, "primary term [" + + task.primaryTerm + "] did not match current primary term [" + + currentState.metaData().index(task.shardId.getIndex()).primaryTerm(task.shardId.id()) + "]")))) + .collect(Collectors.toList()); ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(currentState, tasks); assertTaskResults(taskResultMap, result, currentState, false); } @@ -257,15 +255,13 @@ private static void assertTasksSuccessful( ClusterState clusterState, boolean clusterStateChanged ) { - Map taskResultMap = new IdentityHashMap<>(); - for (FailedShardEntry task : tasks) { - taskResultMap.put(task, ClusterStateTaskExecutor.TaskResult.success()); - } + List> taskResultMap = tasks.stream() + .map(t -> Tuple.tuple(t, ClusterStateTaskExecutor.TaskResult.success())).collect(Collectors.toList()); assertTaskResults(taskResultMap, result, clusterState, clusterStateChanged); } private static void assertTaskResults( - Map taskResultMap, + List> taskResultMap, ClusterStateTaskExecutor.ClusterTasksResult result, ClusterState clusterState, boolean clusterStateChanged @@ -273,30 +269,30 @@ private static void assertTaskResults( // there should be as many task results as tasks assertEquals(taskResultMap.size(), result.executionResults.size()); - for (Map.Entry entry : taskResultMap.entrySet()) { + for (Tuple entry : taskResultMap) { // every task should have a corresponding task result - assertTrue(result.executionResults.containsKey(entry.getKey())); + assertTrue(result.executionResults.containsKey(entry.v1())); // the task results are as expected - assertEquals(entry.getKey().toString(), entry.getValue().isSuccess(), result.executionResults.get(entry.getKey()).isSuccess()); + assertEquals(entry.v1().toString(), entry.v2().isSuccess(), result.executionResults.get(entry.v1()).isSuccess()); } List shards = clusterState.getRoutingTable().allShards(); - for (Map.Entry entry : taskResultMap.entrySet()) { - if (entry.getValue().isSuccess()) { + for (Tuple entry : taskResultMap) { + if (entry.v2().isSuccess()) { // the shard was successfully failed and so should not be in the routing table for (ShardRouting shard : shards) { if (shard.assignedToNode()) { - assertFalse("entry key " + entry.getKey() + ", shard routing " + shard, - entry.getKey().getShardId().equals(shard.shardId()) && - entry.getKey().getAllocationId().equals(shard.allocationId().getId())); + assertFalse("entry key " + entry.v1() + ", shard routing " + shard, + entry.v1().getShardId().equals(shard.shardId()) && + entry.v1().getAllocationId().equals(shard.allocationId().getId())); } } } else { // check we saw the expected failure - ClusterStateTaskExecutor.TaskResult actualResult = result.executionResults.get(entry.getKey()); - assertThat(actualResult.getFailure(), instanceOf(entry.getValue().getFailure().getClass())); - assertThat(actualResult.getFailure().getMessage(), equalTo(entry.getValue().getFailure().getMessage())); + ClusterStateTaskExecutor.TaskResult actualResult = result.executionResults.get(entry.v1()); + assertThat(actualResult.getFailure(), instanceOf(entry.v2().getFailure().getClass())); + assertThat(actualResult.getFailure().getMessage(), equalTo(entry.v2().getFailure().getMessage())); } } From 19baf3179d1acdd07e77ad32af32d0e9f6f15914 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 14 Jun 2018 21:39:20 -0400 Subject: [PATCH 5/7] Revert "TEST: getCapturedRequestsAndClear should be atomic" This reverts commit 99562c0497fbfa0c4c86f455ae25ef9fa36cf5b4. --- .../test/transport/CapturingTransport.java | 38 ++++++++----------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java index a7a0eb446e8fb..81fc934ca6d7e 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -46,7 +46,6 @@ import java.io.UncheckedIOException; import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -95,23 +94,9 @@ public CapturedRequest[] capturedRequests() { * @return the captured requests */ public CapturedRequest[] getCapturedRequestsAndClear() { - List requests = new ArrayList<>(capturedRequests.size()); - capturedRequests.drainTo(requests); - return requests.toArray(new CapturedRequest[0]); - } - - private Map> groupRequestsByTargetNode(Collection requests) { - Map> result = new HashMap<>(); - for (CapturedRequest request : requests) { - result.compute(request.node.getId(), (k, group) -> { - if (group == null) { - group = new ArrayList<>(); - } - group.add(request); - return group; - }); - } - return result; + CapturedRequest[] capturedRequests = capturedRequests(); + clear(); + return capturedRequests; } /** @@ -119,7 +104,16 @@ private Map> groupRequestsByTargetNode(Collection< * Doesn't clear the captured request list. See {@link #clear()} */ public Map> capturedRequestsByTargetNode() { - return groupRequestsByTargetNode(capturedRequests); + Map> map = new HashMap<>(); + for (CapturedRequest request : capturedRequests) { + List nodeList = map.get(request.node.getId()); + if (nodeList == null) { + nodeList = new ArrayList<>(); + map.put(request.node.getId(), nodeList); + } + nodeList.add(request); + } + return map; } /** @@ -131,9 +125,9 @@ public Map> capturedRequestsByTargetNode() { * @return the captured requests grouped by target node */ public Map> getCapturedRequestsByTargetNodeAndClear() { - List requests = new ArrayList<>(capturedRequests.size()); - capturedRequests.drainTo(requests); - return groupRequestsByTargetNode(requests); + Map> map = capturedRequestsByTargetNode(); + clear(); + return map; } /** clears captured requests */ From a1025319cf46bf00f68b744c26b679e8292786e3 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 18 Jun 2018 12:45:41 -0400 Subject: [PATCH 6/7] =?UTF-8?q?Address=20Yannick=E2=80=99s=20comments?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...rdFailedClusterStateTaskExecutorTests.java | 24 +++++++++---------- .../action/shard/ShardStateActionTests.java | 3 +-- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java index 5bf62faffabcf..01d0c518c1be7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java @@ -130,15 +130,15 @@ ClusterState applyFailedShards(ClusterState currentState, List fail tasks.addAll(failingTasks); tasks.addAll(nonExistentTasks); ClusterStateTaskExecutor.ClusterTasksResult result = failingExecutor.execute(currentState, tasks); - List> taskResultMap = new ArrayList<>(); + List> taskResultList = new ArrayList<>(); for (FailedShardEntry failingTask : failingTasks) { - taskResultMap.add(Tuple.tuple(failingTask, + taskResultList.add(Tuple.tuple(failingTask, ClusterStateTaskExecutor.TaskResult.failure(new RuntimeException("simulated applyFailedShards failure")))); } for (FailedShardEntry nonExistentTask : nonExistentTasks) { - taskResultMap.add(Tuple.tuple(nonExistentTask, ClusterStateTaskExecutor.TaskResult.success())); + taskResultList.add(Tuple.tuple(nonExistentTask, ClusterStateTaskExecutor.TaskResult.success())); } - assertTaskResults(taskResultMap, result, currentState, false); + assertTaskResults(taskResultList, result, currentState, false); } public void testIllegalShardFailureRequests() throws Exception { @@ -151,14 +151,14 @@ public void testIllegalShardFailureRequests() throws Exception { tasks.add(new FailedShardEntry(failingTask.shardId, failingTask.allocationId, randomIntBetween(1, (int) primaryTerm - 1), failingTask.message, failingTask.failure, randomBoolean())); } - List> taskResultMap = tasks.stream() + List> taskResultList = tasks.stream() .map(task -> Tuple.tuple(task, ClusterStateTaskExecutor.TaskResult.failure( new ShardStateAction.NoLongerPrimaryShardException(task.shardId, "primary term [" + task.primaryTerm + "] did not match current primary term [" + currentState.metaData().index(task.shardId.getIndex()).primaryTerm(task.shardId.id()) + "]")))) .collect(Collectors.toList()); ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(currentState, tasks); - assertTaskResults(taskResultMap, result, currentState, false); + assertTaskResults(taskResultList, result, currentState, false); } public void testMarkAsStaleWhenFailingShard() throws Exception { @@ -255,21 +255,21 @@ private static void assertTasksSuccessful( ClusterState clusterState, boolean clusterStateChanged ) { - List> taskResultMap = tasks.stream() + List> taskResultList = tasks.stream() .map(t -> Tuple.tuple(t, ClusterStateTaskExecutor.TaskResult.success())).collect(Collectors.toList()); - assertTaskResults(taskResultMap, result, clusterState, clusterStateChanged); + assertTaskResults(taskResultList, result, clusterState, clusterStateChanged); } private static void assertTaskResults( - List> taskResultMap, + List> taskResultList, ClusterStateTaskExecutor.ClusterTasksResult result, ClusterState clusterState, boolean clusterStateChanged ) { // there should be as many task results as tasks - assertEquals(taskResultMap.size(), result.executionResults.size()); + assertEquals(taskResultList.size(), result.executionResults.size()); - for (Tuple entry : taskResultMap) { + for (Tuple entry : taskResultList) { // every task should have a corresponding task result assertTrue(result.executionResults.containsKey(entry.v1())); @@ -278,7 +278,7 @@ private static void assertTaskResults( } List shards = clusterState.getRoutingTable().allShards(); - for (Tuple entry : taskResultMap) { + for (Tuple entry : taskResultList) { if (entry.v2().isSuccess()) { // the shard was successfully failed and so should not be in the routing table for (ShardRouting shard : shards) { diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index d53c3a64e7e90..c80b586da5bd7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -440,13 +440,12 @@ public void testRemoteShardFailedConcurrently() throws Exception { AtomicInteger notifiedResponses = new AtomicInteger(); for (int t = 0; t < clientThreads.length; t++) { - long primaryTerm = clusterService.state().metaData().index(index).primaryTerm(failedShards[0].id()); clientThreads[t] = new Thread(() -> { barrier.arriveAndAwaitAdvance(); for (int i = 0; i < iterationsPerThread; i++) { ShardRouting failedShard = randomFrom(failedShards); shardStateAction.remoteShardFailed(failedShard.shardId(), failedShard.allocationId().getId(), - primaryTerm + 1, randomBoolean(), "test", getSimulatedFailure(), new ShardStateAction.Listener() { + randomLongBetween(1, Long.MAX_VALUE), randomBoolean(), "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override public void onSuccess() { notifiedResponses.incrementAndGet(); From 496a6bdfcf011d35a03947984ba618855ee3eca7 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 18 Jun 2018 12:49:05 -0400 Subject: [PATCH 7/7] Randomize primary term --- .../cluster/action/shard/ShardStateActionTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index c80b586da5bd7..1d78cdeb98374 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -389,13 +389,13 @@ public void testCacheRemoteShardFailed() throws Exception { final String index = "test"; setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); ShardRouting failedShard = getRandomShardRouting(index); - long primaryTerm = clusterService.state().metaData().index(index).primaryTerm(failedShard.id()); boolean markAsStale = randomBoolean(); int numListeners = between(1, 100); CountDownLatch latch = new CountDownLatch(numListeners); + long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); for (int i = 0; i < numListeners; i++) { shardStateAction.remoteShardFailed(failedShard.shardId(), failedShard.allocationId().getId(), - primaryTerm + 1, markAsStale, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + primaryTerm, markAsStale, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override public void onSuccess() { latch.countDown();