Skip to content

Commit

Permalink
1)Modifying: test avoid sending duplicate remote failed shard requests (
Browse files Browse the repository at this point in the history
  • Loading branch information
haveTryTwo committed Jul 29, 2023
1 parent df9fc52 commit ddf98c1
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(MultiGetAction.INSTANCE, TransportMultiGetAction.class,
TransportShardMultiGetAction.class);
actions.register(BulkAction.INSTANCE, TransportBulkAction.class,
TransportShardBulkAction.class);
TransportShardBulkAction.class); // NOTE:htt, 映射BulkAction
actions.register(SearchAction.INSTANCE, TransportSearchAction.class);
actions.register(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
actions.register(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@

public class MainResponse extends ActionResponse implements ToXContentObject {

private String nodeName;
private Version version;
private ClusterName clusterName;
private String nodeName; // NOTE:htt, 节点名称
private Version version; // NOTE:htt, ES 版本
private ClusterName clusterName; // NOTE:htt, 集群名称
private String clusterUuid;
private Build build;
boolean available;

private String privateESVersion = "6.3.2.5"; // NOTE:htt, 区分小版本
private String privateESVersion = "6.3.2.6"; // NOTE:htt, 区分小版本

MainResponse() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterStateTaskExecutor.TaskResult;
import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -43,6 +44,7 @@
import org.elasticsearch.cluster.routing.allocation.StaleShard;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -132,10 +134,15 @@ ClusterState applyFailedShards(ClusterState currentState, List<FailedShard> fail
tasks.addAll(failingTasks);
tasks.addAll(nonExistentTasks);
ClusterStateTaskExecutor.ClusterTasksResult<FailedShardEntry> result = failingExecutor.execute(currentState, tasks);
Map<FailedShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap =
failingTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.failure(new RuntimeException("simulated applyFailedShards failure"))));
taskResultMap.putAll(nonExistentTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.success())));
assertTaskResults(taskResultMap, result, currentState, false);
List<Tuple<FailedShardEntry, TaskResult>> taskResultList = new ArrayList<>();
for (FailedShardEntry failingTask : failingTasks) {
taskResultList.add(Tuple.tuple(failingTask,
ClusterStateTaskExecutor.TaskResult.failure(new RuntimeException("simulated applyFailedShards failure"))));
}
for (FailedShardEntry nonExistentTask : nonExistentTasks) {
taskResultList.add(Tuple.tuple(nonExistentTask, ClusterStateTaskExecutor.TaskResult.success()));
}
assertTaskResults(taskResultList, result, currentState, false);
}

public void testIllegalShardFailureRequests() throws Exception {
Expand All @@ -148,14 +155,14 @@ public void testIllegalShardFailureRequests() throws Exception {
tasks.add(new FailedShardEntry(failingTask.shardId, failingTask.allocationId,
randomIntBetween(1, (int) primaryTerm - 1), failingTask.message, failingTask.failure, randomBoolean()));
}
Map<FailedShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap =
tasks.stream().collect(Collectors.toMap(
Function.identity(),
task -> ClusterStateTaskExecutor.TaskResult.failure(new ShardStateAction.NoLongerPrimaryShardException(task.shardId,
"primary term [" + task.primaryTerm + "] did not match current primary term [" +
currentState.metaData().index(task.shardId.getIndex()).primaryTerm(task.shardId.id()) + "]"))));
List<Tuple<FailedShardEntry, ClusterStateTaskExecutor.TaskResult>> taskResultList = tasks.stream()
.map(task -> Tuple.tuple(task, ClusterStateTaskExecutor.TaskResult.failure(
new ShardStateAction.NoLongerPrimaryShardException(task.shardId, "primary term ["
+ task.primaryTerm + "] did not match current primary term ["
+ currentState.metaData().index(task.shardId.getIndex()).primaryTerm(task.shardId.id()) + "]"))))
.collect(Collectors.toList());
ClusterStateTaskExecutor.ClusterTasksResult<FailedShardEntry> result = executor.execute(currentState, tasks);
assertTaskResults(taskResultMap, result, currentState, false);
assertTaskResults(taskResultList, result, currentState, false);
}

public void testMarkAsStaleWhenFailingShard() throws Exception {
Expand Down Expand Up @@ -252,44 +259,44 @@ private static void assertTasksSuccessful(
ClusterState clusterState,
boolean clusterStateChanged
) {
Map<ShardStateAction.FailedShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap =
tasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.success()));
assertTaskResults(taskResultMap, result, clusterState, clusterStateChanged);
List<Tuple<FailedShardEntry, ClusterStateTaskExecutor.TaskResult>> taskResultList = tasks.stream()
.map(t -> Tuple.tuple(t, ClusterStateTaskExecutor.TaskResult.success())).collect(Collectors.toList());
assertTaskResults(taskResultList, result, clusterState, clusterStateChanged);
}

private static void assertTaskResults(
Map<ShardStateAction.FailedShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap,
List<Tuple<ShardStateAction.FailedShardEntry, ClusterStateTaskExecutor.TaskResult>> taskResultList,
ClusterStateTaskExecutor.ClusterTasksResult<ShardStateAction.FailedShardEntry> result,
ClusterState clusterState,
boolean clusterStateChanged
) {
// there should be as many task results as tasks
assertEquals(taskResultMap.size(), result.executionResults.size());
assertEquals(taskResultList.size(), result.executionResults.size());

for (Map.Entry<ShardStateAction.FailedShardEntry, ClusterStateTaskExecutor.TaskResult> entry : taskResultMap.entrySet()) {
for (Tuple<FailedShardEntry, ClusterStateTaskExecutor.TaskResult> entry : taskResultList) {
// every task should have a corresponding task result
assertTrue(result.executionResults.containsKey(entry.getKey()));
assertTrue(result.executionResults.containsKey(entry.v1()));

// the task results are as expected
assertEquals(entry.getKey().toString(), entry.getValue().isSuccess(), result.executionResults.get(entry.getKey()).isSuccess());
assertEquals(entry.v1().toString(), entry.v2().isSuccess(), result.executionResults.get(entry.v1()).isSuccess());
}

List<ShardRouting> shards = clusterState.getRoutingTable().allShards();
for (Map.Entry<ShardStateAction.FailedShardEntry, ClusterStateTaskExecutor.TaskResult> entry : taskResultMap.entrySet()) {
if (entry.getValue().isSuccess()) {
for (Tuple<FailedShardEntry, ClusterStateTaskExecutor.TaskResult> entry : taskResultList) {
if (entry.v2().isSuccess()) {
// the shard was successfully failed and so should not be in the routing table
for (ShardRouting shard : shards) {
if (shard.assignedToNode()) {
assertFalse("entry key " + entry.getKey() + ", shard routing " + shard,
entry.getKey().getShardId().equals(shard.shardId()) &&
entry.getKey().getAllocationId().equals(shard.allocationId().getId()));
assertFalse("entry key " + entry.v1() + ", shard routing " + shard,
entry.v1().getShardId().equals(shard.shardId()) &&
entry.v1().getAllocationId().equals(shard.allocationId().getId()));
}
}
} else {
// check we saw the expected failure
ClusterStateTaskExecutor.TaskResult actualResult = result.executionResults.get(entry.getKey());
assertThat(actualResult.getFailure(), instanceOf(entry.getValue().getFailure().getClass()));
assertThat(actualResult.getFailure().getMessage(), equalTo(entry.getValue().getFailure().getMessage()));
ClusterStateTaskExecutor.TaskResult actualResult = result.executionResults.get(entry.v1());
assertThat(actualResult.getFailure(), instanceOf(entry.v2().getFailure().getClass()));
assertThat(actualResult.getFailure().getMessage(), equalTo(entry.v2().getFailure().getMessage()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.cluster.action.shard;

import java.util.concurrent.Phaser;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
Expand Down Expand Up @@ -73,9 +74,11 @@
import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;

public class ShardStateActionTests extends ESTestCase {
private static ThreadPool THREAD_POOL;
Expand Down Expand Up @@ -138,6 +141,7 @@ public void tearDown() throws Exception {
clusterService.close();
transportService.close();
super.tearDown();
assertThat(shardStateAction.remoteShardFailedCacheSize(), equalTo(0));
}

@AfterClass
Expand Down Expand Up @@ -381,6 +385,89 @@ public void onFailure(Exception e) {
assertThat(failure.get().getMessage(), equalTo(catastrophicError.getMessage()));
}

public void testCacheRemoteShardFailed() throws Exception {
final String index = "test";
setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5)));
ShardRouting failedShard = getRandomShardRouting(index);
boolean markAsStale = randomBoolean();
int numListeners = between(1, 100);
CountDownLatch latch = new CountDownLatch(numListeners);
long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
for (int i = 0; i < numListeners; i++) {
shardStateAction.remoteShardFailed(failedShard.shardId(), failedShard.allocationId().getId(),
primaryTerm, markAsStale, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onSuccess() {
latch.countDown();
}
@Override
public void onFailure(Exception e) {
latch.countDown();
}
});
}
CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests, arrayWithSize(1));
transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE);
latch.await();
assertThat(transport.capturedRequests(), arrayWithSize(0));
}

public void testRemoteShardFailedConcurrently() throws Exception {
final String index = "test";
final AtomicBoolean shutdown = new AtomicBoolean(false);
setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5)));
ShardRouting[] failedShards = new ShardRouting[between(1, 5)];
for (int i = 0; i < failedShards.length; i++) {
failedShards[i] = getRandomShardRouting(index);
}
Thread[] clientThreads = new Thread[between(1, 6)];
int iterationsPerThread = scaledRandomIntBetween(50, 500);
Phaser barrier = new Phaser(clientThreads.length + 2); // one for master thread, one for the main thread
Thread masterThread = new Thread(() -> {
barrier.arriveAndAwaitAdvance();
while (shutdown.get() == false) {
for (CapturingTransport.CapturedRequest request : transport.getCapturedRequestsAndClear()) {
if (randomBoolean()) {
transport.handleResponse(request.requestId, TransportResponse.Empty.INSTANCE);
} else {
transport.handleRemoteError(request.requestId, randomFrom(getSimulatedFailure()));
}
}
}
});
masterThread.start();

AtomicInteger notifiedResponses = new AtomicInteger();
for (int t = 0; t < clientThreads.length; t++) {
clientThreads[t] = new Thread(() -> {
barrier.arriveAndAwaitAdvance();
for (int i = 0; i < iterationsPerThread; i++) {
ShardRouting failedShard = randomFrom(failedShards);
shardStateAction.remoteShardFailed(failedShard.shardId(), failedShard.allocationId().getId(),
randomLongBetween(1, Long.MAX_VALUE), randomBoolean(), "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onSuccess() {
notifiedResponses.incrementAndGet();
}
@Override
public void onFailure(Exception e) {
notifiedResponses.incrementAndGet();
}
});
}
});
clientThreads[t].start();
}
barrier.arriveAndAwaitAdvance();
for (Thread t : clientThreads) {
t.join();
}
assertBusy(() -> assertThat(notifiedResponses.get(), equalTo(clientThreads.length * iterationsPerThread)));
shutdown.set(true);
masterThread.join();
}

private ShardRouting getRandomShardRouting(String index) {
IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(index);
ShardsIterator shardsIterator = indexRoutingTable.randomAllActiveShardsIt();
Expand Down Expand Up @@ -452,4 +539,61 @@ BytesReference serialize(Writeable writeable, Version version) throws IOExceptio
return out.bytes();
}
}

public void testCompositeListener() throws Exception {
AtomicInteger successCount = new AtomicInteger();
AtomicInteger failureCount = new AtomicInteger();
Exception failure = randomBoolean() ? getSimulatedFailure() : null;
ShardStateAction.CompositeListener compositeListener = new ShardStateAction.CompositeListener(new ShardStateAction.Listener() {
@Override
public void onSuccess() {
successCount.incrementAndGet();
}
@Override
public void onFailure(Exception e) {
assertThat(e, sameInstance(failure));
failureCount.incrementAndGet();
}
});
int iterationsPerThread = scaledRandomIntBetween(100, 1000);
Thread[] threads = new Thread[between(1, 4)];
Phaser barrier = new Phaser(threads.length + 1);
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
barrier.arriveAndAwaitAdvance();
for (int n = 0; n < iterationsPerThread; n++) {
compositeListener.addListener(new ShardStateAction.Listener() {
@Override
public void onSuccess() {
successCount.incrementAndGet();
}
@Override
public void onFailure(Exception e) {
assertThat(e, sameInstance(failure));
failureCount.incrementAndGet();
}
});
}
});
threads[i].start();
}
barrier.arriveAndAwaitAdvance();
if (failure != null) {
compositeListener.onFailure(failure);
} else {
compositeListener.onSuccess();
}
for (Thread t : threads) {
t.join();
}
assertBusy(() -> {
if (failure != null) {
assertThat(successCount.get(), equalTo(0));
assertThat(failureCount.get(), equalTo(threads.length*iterationsPerThread + 1));
} else {
assertThat(successCount.get(), equalTo(threads.length*iterationsPerThread + 1));
assertThat(failureCount.get(), equalTo(0));
}
});
}
}
Loading

0 comments on commit ddf98c1

Please sign in to comment.