Skip to content

Commit

Permalink
Fix RestartIndexFollowingIT.testFollowIndex in case of fatal exception (
Browse files Browse the repository at this point in the history
#92522) (#92649)

This test failed several times after the leader cluster is fully restarted
for the second time. The logs indicate that one or more
ShardFollowNodeTask (the persistent task in charge or replication
operations for a shard) have been stopped because a fatal exception
occured.

The fatal exception is an IllegalStateException with the Unable to open
any connections to remote cluster message. I think this is due to the
leader cluster being slow to restart and the remote cluster sniff strategy
giving up after it tried to connect to the leader cluster nodes.

Since this exception is fatal, the ShardFollowNodeTask stopped to
replicate all operations and the test fails waiting for the number of docs
to match on leader and follower clusters.

The documented way to resolve CCR fatal exceptions for follower is
to recreate the follower or to pause/resume follower. Test has been
adjusted accordingly.

Closes #90666

Co-authored-by: David Turner <david.turner@elastic.co>

Co-authored-by: David Turner <david.turner@elastic.co>
  • Loading branch information
tlrx and DaveCTurner committed Jan 3, 2023
1 parent 1ccda38 commit cd55fd4
Showing 1 changed file with 18 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -58,6 +60,7 @@ public void testFollowIndex() throws Exception {
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen("index1");
setupRemoteCluster();
assertRemoteClusterConnected();

final PutFollowAction.Request followRequest = putFollow("index1", "index2");
if (randomBoolean()) {
Expand All @@ -80,27 +83,31 @@ public void testFollowIndex() throws Exception {
ensureFollowerGreen("index2");

final long secondBatchNumDocs = randomIntBetween(10, 200);
logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs);
for (int i = 0; i < secondBatchNumDocs; i++) {
leaderClient().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get();
}

cleanRemoteCluster();
getLeaderCluster().fullRestart();
ensureLeaderGreen("index1");
// Remote connection needs to be re-configured, because all the nodes in leader cluster have been restarted:
setupRemoteCluster();

final long thirdBatchNumDocs = randomIntBetween(10, 200);
logger.info("Indexing [{}] docs as third batch", thirdBatchNumDocs);
for (int i = 0; i < thirdBatchNumDocs; i++) {
leaderClient().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get();
}

assertBusy(
() -> assertThat(
followerClient().prepareSearch("index2").get().getHits().getTotalHits().value,
equalTo(firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs)
)
);
final long totalDocs = firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs;
final AtomicBoolean resumeAfterDisconnectionOnce = new AtomicBoolean(false);
assertBusy(() -> {
if (resumeAfterDisconnectionOnce.compareAndSet(false, true)) {
// Remote connection needs to be re-configured, because all the nodes in leader cluster have been restarted:
setupRemoteCluster();
}
assertRemoteClusterConnected();
assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(totalDocs));
}, 30L, TimeUnit.SECONDS);

cleanRemoteCluster();
assertAcked(followerClient().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("index2")).actionGet());
Expand All @@ -120,6 +127,9 @@ private void setupRemoteCluster() throws Exception {
String address = getLeaderCluster().getAnyMasterNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
updateSettingsRequest.persistentSettings(Settings.builder().put("cluster.remote.leader_cluster.seeds", address));
assertAcked(followerClient().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}

private void assertRemoteClusterConnected() throws Exception {
List<RemoteConnectionInfo> infos = followerClient().execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).get().getInfos();
assertThat(infos.size(), equalTo(1));
assertTrue(infos.get(0).isConnected());
Expand Down

0 comments on commit cd55fd4

Please sign in to comment.