diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 4888b0367fd20..57056bd61a553 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -230,11 +230,17 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } List removedRemoteClusters = new ArrayList<>(); - for (String remoteCluster : autoFollowers.keySet()) { + for (Map.Entry entry : autoFollowers.entrySet()) { + String remoteCluster = entry.getKey(); + AutoFollower autoFollower = entry.getValue(); boolean exist = autoFollowMetadata.getPatterns().values().stream() .anyMatch(pattern -> pattern.getRemoteCluster().equals(remoteCluster)); if (exist == false) { removedRemoteClusters.add(remoteCluster); + } else if (autoFollower.remoteClusterConnectionMissing) { + LOGGER.info("Retrying auto follower [{}] after remote cluster connection was missing", remoteCluster); + autoFollower.remoteClusterConnectionMissing = false; + autoFollower.start(); } } this.autoFollowers = autoFollowers @@ -266,6 +272,7 @@ abstract static class AutoFollower { private volatile long lastAutoFollowTimeInMillis = -1; private volatile long metadataVersion = 0; + private volatile boolean remoteClusterConnectionMissing = false; private volatile CountDown autoFollowPatternsCountDown; private volatile AtomicArray autoFollowResults; @@ -312,6 +319,14 @@ void start() { autoFollowIndices(autoFollowMetadata, clusterState, remoteClusterState, patterns); } else { assert remoteError != null; + String expectedErrorMessage = "unknown cluster alias [" + remoteCluster + "]"; + if (remoteError instanceof IllegalArgumentException && + expectedErrorMessage.equals(remoteError.getMessage())) { + LOGGER.info("AutoFollower for cluster [{}] has stopped, because remote connection is gone", remoteCluster); + remoteClusterConnectionMissing = true; + return; + } + for (int i = 0; i < patterns.size(); i++) { String autoFollowPatternName = patterns.get(i); finalise(i, new AutoFollowResult(autoFollowPatternName, remoteError)); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index f213d5a4999f1..13b12d4b96f2b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -419,7 +419,7 @@ private void updateSettings(final LongConsumer handler, final AtomicInteger retr private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) { assert e != null; - if (shouldRetry(e) && isStopped() == false) { + if (shouldRetry(params.getRemoteCluster(), e) && isStopped() == false) { int currentRetry = retryCounter.incrementAndGet(); LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying [{}]", params.getFollowShardId(), currentRetry), e); @@ -441,13 +441,14 @@ static long computeDelay(int currentRetry, long maxRetryDelayInMillis) { return Math.min(backOffDelay, maxRetryDelayInMillis); } - static boolean shouldRetry(Exception e) { + static boolean shouldRetry(String remoteCluster, Exception e) { if (NetworkExceptionHelper.isConnectException(e)) { return true; } else if (NetworkExceptionHelper.isCloseConnectionException(e)) { return true; } + String noSuchRemoteClusterMessage = "no such remote cluster: " + remoteCluster; final Throwable actual = ExceptionsHelper.unwrapCause(e); return actual instanceof ShardNotFoundException || actual instanceof IllegalIndexShardStateException || @@ -460,7 +461,8 @@ static boolean shouldRetry(Exception e) { actual instanceof NodeDisconnectedException || actual instanceof NodeNotConnectedException || actual instanceof NodeClosedException || - (actual.getMessage() != null && actual.getMessage().contains("TransportService is closed")); + (actual.getMessage() != null && actual.getMessage().contains("TransportService is closed")) || + (actual instanceof IllegalArgumentException && noSuchRemoteClusterMessage.equals(actual.getMessage())); } // These methods are protected for testing purposes: diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index bd22b85684ca4..63fd80e6d48e6 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -244,7 +244,11 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co request.setMaxOperationCount(maxOperationCount); request.setMaxBatchSize(params.getMaxReadRequestSize()); request.setPollTimeout(params.getReadPollTimeout()); - remoteClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler)); + try { + remoteClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler)); + } catch (Exception e) { + errorHandler.accept(e); + } } }; } @@ -274,7 +278,7 @@ protected void nodeOperation(final AllocatedPersistentTask task, final ShardFoll return; } - if (ShardFollowNodeTask.shouldRetry(e)) { + if (ShardFollowNodeTask.shouldRetry(params.getRemoteCluster(), e)) { logger.debug(new ParameterizedMessage("failed to fetch follow shard global {} checkpoint and max sequence number", shardFollowNodeTask), e); threadPool.schedule(params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME, () -> nodeOperation(task, params, state)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java index e858105dddf15..9287d92cc0e99 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java @@ -6,25 +6,31 @@ package org.elasticsearch.xpack.ccr; -import org.apache.lucene.util.LuceneTestCase; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.xpack.CcrSingleNodeTestCase; +import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; +import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; +import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import java.io.IOException; +import java.util.Collections; import java.util.Map; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/36764") public class LocalIndexFollowingIT extends CcrSingleNodeTestCase { public void testFollowIndex() throws Exception { @@ -86,6 +92,51 @@ public void testDoNotCreateFollowerIfLeaderDoesNotHaveSoftDeletes() throws Excep assertThat(client().admin().indices().prepareExists("follower-index").get().isExists(), equalTo(false)); } + public void testRemoveRemoteConnection() throws Exception { + PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); + request.setName("my_pattern"); + request.setRemoteCluster("local"); + request.setLeaderIndexPatterns(Collections.singletonList("logs-*")); + request.setFollowIndexNamePattern("copy-{{leader_index}}"); + request.setReadPollTimeout(TimeValue.timeValueMillis(10)); + assertTrue(client().execute(PutAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged()); + + Settings leaderIndexSettings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .build(); + createIndex("logs-20200101", leaderIndexSettings); + client().prepareIndex("logs-20200101", "doc").setSource("{}", XContentType.JSON).get(); + assertBusy(() -> { + CcrStatsAction.Response response = client().execute(CcrStatsAction.INSTANCE, new CcrStatsAction.Request()).actionGet(); + assertThat(response.getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), equalTo(1L)); + assertThat(response.getFollowStats().getStatsResponses().size(), equalTo(1)); + assertThat(response.getFollowStats().getStatsResponses().get(0).status().followerGlobalCheckpoint(), equalTo(0L)); + }); + + // Both auto follow patterns and index following should be resilient to remote connection being missing: + removeLocalRemote(); + // This triggers a cluster state update, which should let auto follow coordinator retry auto following: + setupLocalRemote(); + + // This new index should be picked up by auto follow coordinator + createIndex("logs-20200102", leaderIndexSettings); + // This new document should be replicated to follower index: + client().prepareIndex("logs-20200101", "doc").setSource("{}", XContentType.JSON).get(); + assertBusy(() -> { + CcrStatsAction.Response response = client().execute(CcrStatsAction.INSTANCE, new CcrStatsAction.Request()).actionGet(); + assertThat(response.getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), equalTo(2L)); + + FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest(); + statsRequest.setIndices(new String[]{"copy-logs-20200101"}); + FollowStatsAction.StatsResponses responses = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet(); + assertThat(responses.getStatsResponses().size(), equalTo(1)); + assertThat(responses.getStatsResponses().get(0).status().getFatalException(), nullValue()); + assertThat(responses.getStatsResponses().get(0).status().followerGlobalCheckpoint(), equalTo(1L)); + }); + } + public static String getIndexSettings(final int numberOfShards, final int numberOfReplicas, final Map additionalIndexSettings) throws IOException { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowStatsIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowStatsIT.java index 61307dfd85dbd..bf6f080099088 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowStatsIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowStatsIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.ccr.action; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; @@ -34,7 +33,6 @@ * Test scope is important to ensure that other tests added to this suite do not interfere with the expectation in * testStatsWhenNoPersistentTasksMetaDataExists that the cluster state does not contain any persistent tasks metadata. */ -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/36764") public class FollowStatsIT extends CcrSingleNodeTestCase { /**