Skip to content

Commit

Permalink
Add test for PutFollowAction on a closed index (elastic#38236) (ela…
Browse files Browse the repository at this point in the history
…stic#38377)

This is related to elastic#35975. Currently when an index falls behind a leader
it encounters a fatal exception. This commit adds a test for that
scenario. Additionally, it tests that the user can stop following, close
the follower index, and put follow again. After the indexing is
re-bootstrapped, it will recover the documents it lost in normal
following operations.
  • Loading branch information
Tim-Brooks committed Feb 5, 2019
1 parent f63cbdb commit 8ada485
Showing 1 changed file with 107 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
Expand All @@ -16,10 +19,13 @@
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
Expand Down Expand Up @@ -53,6 +59,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.SnapshotRestoreException;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.transport.NoSuchRemoteClusterException;
import org.elasticsearch.xpack.CcrIntegTestCase;
Expand All @@ -75,6 +82,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -86,6 +94,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -943,6 +952,98 @@ public void testUpdateAnalysisLeaderIndexSettings() throws Exception {
assertThat(hasFollowIndexBeenClosedChecker.getAsBoolean(), is(true));
}

public void testMustCloseIndexAndPauseToRestartWithPutFollowing() throws Exception {
final int numberOfPrimaryShards = randomIntBetween(1, 3);
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderYellow("index1");

final PutFollowAction.Request followRequest = putFollow("index1", "index2");
PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
assertTrue(response.isFollowIndexCreated());
assertTrue(response.isFollowIndexShardsAcked());
assertTrue(response.isIndexFollowingStarted());

final PutFollowAction.Request followRequest2 = putFollow("index1", "index2");
expectThrows(SnapshotRestoreException.class,
() -> followerClient().execute(PutFollowAction.INSTANCE, followRequest2).actionGet());

followerClient().admin().indices().prepareClose("index2").get();
expectThrows(ResourceAlreadyExistsException.class,
() -> followerClient().execute(PutFollowAction.INSTANCE, followRequest2).actionGet());
}

public void testIndexFallBehind() throws Exception {
final int numberOfPrimaryShards = randomIntBetween(1, 3);
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderYellow("index1");

final int numDocs = randomIntBetween(2, 64);
logger.info("Indexing [{}] docs as first batch", numDocs);
for (int i = 0; i < numDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
}

final PutFollowAction.Request followRequest = putFollow("index1", "index2");
PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
assertTrue(response.isFollowIndexCreated());
assertTrue(response.isFollowIndexShardsAcked());
assertTrue(response.isIndexFollowingStarted());

assertIndexFullyReplicatedToFollower("index1", "index2");
for (int i = 0; i < numDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i));
}

pauseFollow("index2");

for (int i = 0; i < numDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i * 2);
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
}
leaderClient().prepareDelete("index1", "doc", "1").get();
leaderClient().admin().indices().refresh(new RefreshRequest("index1")).actionGet();
leaderClient().admin().indices().flush(new FlushRequest("index1").force(true)).actionGet();
ForceMergeRequest forceMergeRequest = new ForceMergeRequest("index1");
forceMergeRequest.maxNumSegments(1);
leaderClient().admin().indices().forceMerge(forceMergeRequest).actionGet();

followerClient().execute(ResumeFollowAction.INSTANCE, followRequest.getFollowRequest()).get();

assertBusy(() -> {
List<ShardFollowNodeTaskStatus> statuses = getFollowTaskStatuses("index2");
Set<ResourceNotFoundException> exceptions = statuses.stream()
.map(ShardFollowNodeTaskStatus::getFatalException)
.filter(Objects::nonNull)
.map(ExceptionsHelper::unwrapCause)
.filter(e -> e instanceof ResourceNotFoundException)
.map(e -> (ResourceNotFoundException) e)
.filter(e -> e.getMetadataKeys().contains("es.requested_operations_missing"))
.collect(Collectors.toSet());
assertThat(exceptions.size(), greaterThan(0));
});

followerClient().admin().indices().prepareClose("index2").get();
pauseFollow("index2");


final PutFollowAction.Request followRequest2 = putFollow("index1", "index2");
PutFollowAction.Response response2 = followerClient().execute(PutFollowAction.INSTANCE, followRequest2).get();
assertTrue(response2.isFollowIndexCreated());
assertTrue(response2.isFollowIndexShardsAcked());
assertTrue(response2.isIndexFollowingStarted());

ensureFollowerGreen("index2");
assertIndexFullyReplicatedToFollower("index1", "index2");
for (int i = 2; i < numDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i, i * 2));
}
}

private long getFollowTaskSettingsVersion(String followerIndex) {
long settingsVersion = -1L;
for (ShardFollowNodeTaskStatus status : getFollowTaskStatuses(followerIndex)) {
Expand Down Expand Up @@ -1028,9 +1129,13 @@ private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, f
}

private CheckedRunnable<Exception> assertExpectedDocumentRunnable(final int value) {
return assertExpectedDocumentRunnable(value, value);
}

private CheckedRunnable<Exception> assertExpectedDocumentRunnable(final int key, final int value) {
return () -> {
final GetResponse getResponse = followerClient().prepareGet("index2", "doc", Integer.toString(value)).get();
assertTrue("Doc with id [" + value + "] is missing", getResponse.isExists());
final GetResponse getResponse = followerClient().prepareGet("index2", "doc", Integer.toString(key)).get();
assertTrue("Doc with id [" + key + "] is missing", getResponse.isExists());
assertTrue((getResponse.getSource().containsKey("f")));
assertThat(getResponse.getSource().get("f"), equalTo(value));
};
Expand Down

0 comments on commit 8ada485

Please sign in to comment.