Skip to content

Commit

Permalink
Increase timeout in testFollowIndexWithConcurrentMappingChanges (#60534)
Browse files Browse the repository at this point in the history
The test failed because the leader was taking a lot of CPUs to process
many mapping updates. This commit reduces the mapping updates, increases
timeout, and adds more debug info.

Closes #59832
  • Loading branch information
dnhatn committed Aug 11, 2020
1 parent 1633972 commit 4b9e986
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,38 +249,23 @@ public void testFollowIndexWithConcurrentMappingChanges() throws Exception {
final int firstBatchNumDocs = randomIntBetween(2, 64);
logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs);
for (int i = 0; i < firstBatchNumDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
leaderClient().prepareIndex("index1", "_doc").setId(Integer.toString(i)).setSource("f", i).get();
}

AtomicBoolean isRunning = new AtomicBoolean(true);

// Concurrently index new docs with mapping changes
int numFields = between(10, 20);
Thread thread = new Thread(() -> {
int docID = 10000;
char[] chars = "abcdeghijklmnopqrstuvwxyz".toCharArray();
for (char c : chars) {
int numDocs = between(10, 200);
for (int i = 0; i < numDocs; i++) {
if (isRunning.get() == false) {
break;
}
final String source;
long valueToPutInDoc = randomLongBetween(0, 50000);
if (randomBoolean()) {
source = String.format(Locale.ROOT, "{\"%c\":%d}", c, valueToPutInDoc);
} else {
source = String.format(Locale.ROOT, "{\"%c\":\"%d\"}", c, valueToPutInDoc);
}
for (int i = 1; i < 10; i++) {
if (isRunning.get() == false) {
break;
}
leaderClient().prepareIndex("index1", "doc", Long.toString(docID++)).setSource(source, XContentType.JSON).get();
if (rarely()) {
leaderClient().admin().indices().prepareFlush("index1").setForce(true).get();
}
}
if (between(0, 100) < 20) {
leaderClient().admin().indices().prepareFlush("index1").setForce(false).setWaitIfOngoing(false).get();
final String field = "f-" + between(1, numFields);
leaderClient().prepareIndex("index1", "_doc").setSource(field, between(0, 1000)).get();
if (rarely()) {
leaderClient().admin().indices().prepareFlush("index1").setWaitIfOngoing(false).setForce(false).get();
}
}
});
Expand All @@ -298,16 +283,14 @@ public void testFollowIndexWithConcurrentMappingChanges() throws Exception {
final int secondBatchNumDocs = randomIntBetween(2, 64);
logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs);
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
leaderClient().prepareIndex("index1", "_doc").setId(Integer.toString(i)).setSource("f", i).get();
}

for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
for (int i = 0; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i), 1, TimeUnit.MINUTES);
}

isRunning.set(false);
thread.join();
assertIndexFullyReplicatedToFollower("index1", "index2");
}

public void testFollowIndexWithoutWaitForComplete() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
Expand Down Expand Up @@ -367,7 +368,7 @@ protected final ClusterHealthStatus ensureFollowerGreen(String... indices) {

protected final ClusterHealthStatus ensureFollowerGreen(boolean waitForNoInitializingShards, String... indices) {
logger.info("ensure green follower indices {}", Arrays.toString(indices));
return ensureColor(clusterGroup.followerCluster, ClusterHealthStatus.GREEN, TimeValue.timeValueSeconds(30),
return ensureColor(clusterGroup.followerCluster, ClusterHealthStatus.GREEN, TimeValue.timeValueSeconds(60),
waitForNoInitializingShards, indices);
}

Expand All @@ -389,10 +390,21 @@ private ClusterHealthStatus ensureColor(TestCluster testCluster,

ClusterHealthResponse actionGet = testCluster.client().admin().cluster().health(healthRequest).actionGet();
if (actionGet.isTimedOut()) {
logger.info("{} timed out, cluster state:\n{}\n{}",
logger.info("{} timed out: " +
"\nleader cluster state:\n{}" +
"\nleader cluster hot threads:\n{}" +
"\nleader cluster tasks:\n{}" +
"\nfollower cluster state:\n{}" +
"\nfollower cluster hot threads:\n{}" +
"\nfollower cluster tasks:\n{}",
method,
testCluster.client().admin().cluster().prepareState().get().getState(),
testCluster.client().admin().cluster().preparePendingClusterTasks().get());
leaderClient().admin().cluster().prepareState().get().getState(),
getHotThreads(leaderClient()),
leaderClient().admin().cluster().preparePendingClusterTasks().get(),
followerClient().admin().cluster().prepareState().get().getState(),
getHotThreads(followerClient()),
followerClient().admin().cluster().preparePendingClusterTasks().get()
);
fail("timed out waiting for " + color + " state");
}
assertThat("Expected at least " + clusterHealthStatus + " but got " + actionGet.getStatus(),
Expand All @@ -401,6 +413,11 @@ private ClusterHealthStatus ensureColor(TestCluster testCluster,
return actionGet.getStatus();
}

static String getHotThreads(Client client) {
return client.admin().cluster().prepareNodesHotThreads().setThreads(99999).setIgnoreIdleThreads(false)
.get().getNodes().stream().map(NodeHotThreads::getHotThreads).collect(Collectors.joining("\n"));
}

protected final Index resolveLeaderIndex(String index) {
GetIndexResponse getIndexResponse = leaderClient().admin().indices().prepareGetIndex().setIndices(index).get();
assertTrue("index " + index + " not found", getIndexResponse.getSettings().containsKey(index));
Expand Down

0 comments on commit 4b9e986

Please sign in to comment.