From 8ada485e43fd1ca8d3663b25e4d295553a58d5db Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 4 Feb 2019 18:41:51 -0600 Subject: [PATCH] Add test for `PutFollowAction` on a closed index (#38236) (#38377) This is related to #35975. Currently when an index falls behind a leader it encounters a fatal exception. This commit adds a test for that scenario. Additionally, it tests that the user can stop following, close the follower index, and put follow again. After the indexing is re-bootstrapped, it will recover the documents it lost in normal following operations. --- .../xpack/ccr/IndexFollowingIT.java | 109 +++++++++++++++++- 1 file changed, 107 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index b6b45b2b2c635..6fa5eec3ccaaa 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -8,6 +8,9 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; @@ -16,10 +19,13 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; @@ -53,6 +59,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.snapshots.SnapshotRestoreException; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.xpack.CcrIntegTestCase; @@ -75,6 +82,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -86,6 +94,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @@ -943,6 +952,98 @@ public void testUpdateAnalysisLeaderIndexSettings() throws Exception { assertThat(hasFollowIndexBeenClosedChecker.getAsBoolean(), is(true)); } + public void testMustCloseIndexAndPauseToRestartWithPutFollowing() throws Exception { + final int numberOfPrimaryShards = randomIntBetween(1, 3); + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderYellow("index1"); + + final PutFollowAction.Request followRequest = putFollow("index1", "index2"); + PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + assertTrue(response.isFollowIndexCreated()); + assertTrue(response.isFollowIndexShardsAcked()); + assertTrue(response.isIndexFollowingStarted()); + + final PutFollowAction.Request followRequest2 = putFollow("index1", "index2"); + expectThrows(SnapshotRestoreException.class, + () -> followerClient().execute(PutFollowAction.INSTANCE, followRequest2).actionGet()); + + followerClient().admin().indices().prepareClose("index2").get(); + expectThrows(ResourceAlreadyExistsException.class, + () -> followerClient().execute(PutFollowAction.INSTANCE, followRequest2).actionGet()); + } + + public void testIndexFallBehind() throws Exception { + final int numberOfPrimaryShards = randomIntBetween(1, 3); + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderYellow("index1"); + + final int numDocs = randomIntBetween(2, 64); + logger.info("Indexing [{}] docs as first batch", numDocs); + for (int i = 0; i < numDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + } + + final PutFollowAction.Request followRequest = putFollow("index1", "index2"); + PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + assertTrue(response.isFollowIndexCreated()); + assertTrue(response.isFollowIndexShardsAcked()); + assertTrue(response.isIndexFollowingStarted()); + + assertIndexFullyReplicatedToFollower("index1", "index2"); + for (int i = 0; i < numDocs; i++) { + assertBusy(assertExpectedDocumentRunnable(i)); + } + + pauseFollow("index2"); + + for (int i = 0; i < numDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i * 2); + leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + } + leaderClient().prepareDelete("index1", "doc", "1").get(); + leaderClient().admin().indices().refresh(new RefreshRequest("index1")).actionGet(); + leaderClient().admin().indices().flush(new FlushRequest("index1").force(true)).actionGet(); + ForceMergeRequest forceMergeRequest = new ForceMergeRequest("index1"); + forceMergeRequest.maxNumSegments(1); + leaderClient().admin().indices().forceMerge(forceMergeRequest).actionGet(); + + followerClient().execute(ResumeFollowAction.INSTANCE, followRequest.getFollowRequest()).get(); + + assertBusy(() -> { + List statuses = getFollowTaskStatuses("index2"); + Set exceptions = statuses.stream() + .map(ShardFollowNodeTaskStatus::getFatalException) + .filter(Objects::nonNull) + .map(ExceptionsHelper::unwrapCause) + .filter(e -> e instanceof ResourceNotFoundException) + .map(e -> (ResourceNotFoundException) e) + .filter(e -> e.getMetadataKeys().contains("es.requested_operations_missing")) + .collect(Collectors.toSet()); + assertThat(exceptions.size(), greaterThan(0)); + }); + + followerClient().admin().indices().prepareClose("index2").get(); + pauseFollow("index2"); + + + final PutFollowAction.Request followRequest2 = putFollow("index1", "index2"); + PutFollowAction.Response response2 = followerClient().execute(PutFollowAction.INSTANCE, followRequest2).get(); + assertTrue(response2.isFollowIndexCreated()); + assertTrue(response2.isFollowIndexShardsAcked()); + assertTrue(response2.isIndexFollowingStarted()); + + ensureFollowerGreen("index2"); + assertIndexFullyReplicatedToFollower("index1", "index2"); + for (int i = 2; i < numDocs; i++) { + assertBusy(assertExpectedDocumentRunnable(i, i * 2)); + } + } + private long getFollowTaskSettingsVersion(String followerIndex) { long settingsVersion = -1L; for (ShardFollowNodeTaskStatus status : getFollowTaskStatuses(followerIndex)) { @@ -1028,9 +1129,13 @@ private CheckedRunnable assertTask(final int numberOfPrimaryShards, f } private CheckedRunnable assertExpectedDocumentRunnable(final int value) { + return assertExpectedDocumentRunnable(value, value); + } + + private CheckedRunnable assertExpectedDocumentRunnable(final int key, final int value) { return () -> { - final GetResponse getResponse = followerClient().prepareGet("index2", "doc", Integer.toString(value)).get(); - assertTrue("Doc with id [" + value + "] is missing", getResponse.isExists()); + final GetResponse getResponse = followerClient().prepareGet("index2", "doc", Integer.toString(key)).get(); + assertTrue("Doc with id [" + key + "] is missing", getResponse.isExists()); assertTrue((getResponse.getSource().containsKey("f"))); assertThat(getResponse.getSource().get("f"), equalTo(value)); };