From 12f7952db3702c14463a718d4a124833731e552c Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Sun, 16 Dec 2018 13:33:50 +0100 Subject: [PATCH 1/3] Make CCR resilient against missingremote cluster connections Both index following and auto following should be resilient against missing remote connections. This happens in the case that they get accidentally removed by a user. When this happens auto following and index following will retry to continue instead of failing with unrecoverable exceptions. Both the put follow and put auto follow APIs validate whether the remote cluster connection. The logic added in this change only exists in case during the lifetime of a follower index or auto follow pattern the remote connection gets removed. This retry behaviour similar how CCR deals with authorization errors. Closes #36667 Closes #36255 --- .../ccr/action/AutoFollowCoordinator.java | 17 +++++- .../xpack/ccr/action/ShardFollowNodeTask.java | 8 +-- .../ccr/action/ShardFollowTasksExecutor.java | 8 ++- .../xpack/CcrSingleNodeTestCase.java | 5 +- .../xpack/ccr/LocalIndexFollowingIT.java | 52 +++++++++++++++++++ 5 files changed, 83 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 7900351105c06..d60e3f274cc30 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -207,11 +207,17 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } List removedRemoteClusters = new ArrayList<>(); - for (String remoteCluster : autoFollowers.keySet()) { + for (Map.Entry entry : autoFollowers.entrySet()) { + String remoteCluster = entry.getKey(); + AutoFollower autoFollower = entry.getValue(); boolean exist = autoFollowMetadata.getPatterns().values().stream() .anyMatch(pattern -> pattern.getRemoteCluster().equals(remoteCluster)); if (exist == false) { removedRemoteClusters.add(remoteCluster); + } else if (autoFollower.remoteClusterConnectionMissing) { + LOGGER.info("Retrying auto follower [{}] after remote cluster connection was missing", remoteCluster); + autoFollower.remoteClusterConnectionMissing = false; + autoFollower.start(); } } this.autoFollowers = autoFollowers @@ -241,6 +247,7 @@ abstract static class AutoFollower { private final Supplier followerClusterStateSupplier; private volatile long metadataVersion = 0; + private volatile boolean remoteClusterConnectionMissing = false; private volatile CountDown autoFollowPatternsCountDown; private volatile AtomicArray autoFollowResults; @@ -284,6 +291,14 @@ void start() { autoFollowIndices(autoFollowMetadata, clusterState, remoteClusterState, patterns); } else { assert remoteError != null; + String expectedErrorMessage = "unknown cluster alias [" + remoteCluster + "]"; + if (remoteError instanceof IllegalArgumentException && + expectedErrorMessage.equals(remoteError.getMessage())) { + LOGGER.info("AutoFollower for cluster [{}] has stopped, because remote connection is gone", remoteCluster); + remoteClusterConnectionMissing = true; + return; + } + for (int i = 0; i < patterns.size(); i++) { String autoFollowPatternName = patterns.get(i); finalise(i, new AutoFollowResult(autoFollowPatternName, remoteError)); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index f213d5a4999f1..13b12d4b96f2b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -419,7 +419,7 @@ private void updateSettings(final LongConsumer handler, final AtomicInteger retr private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) { assert e != null; - if (shouldRetry(e) && isStopped() == false) { + if (shouldRetry(params.getRemoteCluster(), e) && isStopped() == false) { int currentRetry = retryCounter.incrementAndGet(); LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying [{}]", params.getFollowShardId(), currentRetry), e); @@ -441,13 +441,14 @@ static long computeDelay(int currentRetry, long maxRetryDelayInMillis) { return Math.min(backOffDelay, maxRetryDelayInMillis); } - static boolean shouldRetry(Exception e) { + static boolean shouldRetry(String remoteCluster, Exception e) { if (NetworkExceptionHelper.isConnectException(e)) { return true; } else if (NetworkExceptionHelper.isCloseConnectionException(e)) { return true; } + String noSuchRemoteClusterMessage = "no such remote cluster: " + remoteCluster; final Throwable actual = ExceptionsHelper.unwrapCause(e); return actual instanceof ShardNotFoundException || actual instanceof IllegalIndexShardStateException || @@ -460,7 +461,8 @@ static boolean shouldRetry(Exception e) { actual instanceof NodeDisconnectedException || actual instanceof NodeNotConnectedException || actual instanceof NodeClosedException || - (actual.getMessage() != null && actual.getMessage().contains("TransportService is closed")); + (actual.getMessage() != null && actual.getMessage().contains("TransportService is closed")) || + (actual instanceof IllegalArgumentException && noSuchRemoteClusterMessage.equals(actual.getMessage())); } // These methods are protected for testing purposes: diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index bd22b85684ca4..63fd80e6d48e6 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -244,7 +244,11 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co request.setMaxOperationCount(maxOperationCount); request.setMaxBatchSize(params.getMaxReadRequestSize()); request.setPollTimeout(params.getReadPollTimeout()); - remoteClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler)); + try { + remoteClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler)); + } catch (Exception e) { + errorHandler.accept(e); + } } }; } @@ -274,7 +278,7 @@ protected void nodeOperation(final AllocatedPersistentTask task, final ShardFoll return; } - if (ShardFollowNodeTask.shouldRetry(e)) { + if (ShardFollowNodeTask.shouldRetry(params.getRemoteCluster(), e)) { logger.debug(new ParameterizedMessage("failed to fetch follow shard global {} checkpoint and max sequence number", shardFollowNodeTask), e); threadPool.schedule(params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME, () -> nodeOperation(task, params, state)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java index 417de7cd985c5..1a74392f1ab3b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java @@ -58,10 +58,13 @@ public void setupLocalRemote() { } @After - public void remoteLocalRemote() throws Exception { + public void purgeCCRMetadata() throws Exception { ClusterService clusterService = getInstanceFromNode(ClusterService.class); removeCCRRelatedMetadataFromClusterState(clusterService); + } + @After + public void removeLocalRemote() { ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); updateSettingsRequest.transientSettings(Settings.builder().put("cluster.remote.local.seeds", (String) null)); assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java index bbdf1a06354d3..80ba976a6bdb1 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java @@ -6,17 +6,23 @@ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.xpack.CcrSingleNodeTestCase; +import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; +import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import java.io.IOException; +import java.util.Collections; import java.util.Comparator; import java.util.Map; @@ -24,6 +30,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; public class LocalIndexFollowingIT extends CcrSingleNodeTestCase { @@ -119,6 +126,51 @@ public void testDoNotCreateFollowerIfLeaderDoesNotHaveSoftDeletes() throws Excep assertThat(client().admin().indices().prepareExists("follower-index").get().isExists(), equalTo(false)); } + public void testRemoveRemoteConnection() throws Exception { + PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); + request.setName("my_pattern"); + request.setRemoteCluster("local"); + request.setLeaderIndexPatterns(Collections.singletonList("logs-*")); + request.setFollowIndexNamePattern("copy-{{leader_index}}"); + request.setReadPollTimeout(TimeValue.timeValueMillis(10)); + assertTrue(client().execute(PutAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged()); + + Settings leaderIndexSettings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .build(); + createIndex("logs-20200101", leaderIndexSettings); + client().prepareIndex("logs-20200101", "doc").setSource("{}", XContentType.JSON).get(); + assertBusy(() -> { + CcrStatsAction.Response response = client().execute(CcrStatsAction.INSTANCE, new CcrStatsAction.Request()).actionGet(); + assertThat(response.getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), equalTo(1L)); + assertThat(response.getFollowStats().getStatsResponses().size(), equalTo(1)); + assertThat(response.getFollowStats().getStatsResponses().get(0).status().followerGlobalCheckpoint(), equalTo(0L)); + }); + + // Both auto follow patterns and index following should be resilient to remote connection being missing: + removeLocalRemote(); + // This triggers a cluster state update, which should let auto follow coordinator retry auto following: + setupLocalRemote(); + + // This new index should be picked up by auto follow coordinator + createIndex("logs-20200102", leaderIndexSettings); + // This new document should be replicated to follower index: + client().prepareIndex("logs-20200101", "doc").setSource("{}", XContentType.JSON).get(); + assertBusy(() -> { + CcrStatsAction.Response response = client().execute(CcrStatsAction.INSTANCE, new CcrStatsAction.Request()).actionGet(); + assertThat(response.getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), equalTo(2L)); + + FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest(); + statsRequest.setIndices(new String[]{"copy-logs-20200101"}); + FollowStatsAction.StatsResponses responses = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet(); + assertThat(responses.getStatsResponses().size(), equalTo(1)); + assertThat(responses.getStatsResponses().get(0).status().getFatalException(), nullValue()); + assertThat(responses.getStatsResponses().get(0).status().followerGlobalCheckpoint(), equalTo(1L)); + }); + } + private String getIndexSettings(final int numberOfShards, final int numberOfReplicas, final Map additionalIndexSettings) throws IOException { final String settings; From 4f406a315b4c9e1c30f7b1f24956d88eca668043 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Sun, 16 Dec 2018 14:06:38 +0100 Subject: [PATCH 2/3] unused import --- .../java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java index ca2e24c8e2b3f..9287d92cc0e99 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.Collections; -import java.util.Comparator; import java.util.Map; import static java.util.Collections.singletonMap; From 9b3326470e0428a07ea84e952fa14ee912b0a8fe Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 18 Dec 2018 16:05:32 +0100 Subject: [PATCH 3/3] unmuted tests --- .../java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java | 2 -- .../java/org/elasticsearch/xpack/ccr/action/FollowStatsIT.java | 2 -- 2 files changed, 4 deletions(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java index dd7795fbf0493..9287d92cc0e99 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.ccr; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; @@ -32,7 +31,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/36764") public class LocalIndexFollowingIT extends CcrSingleNodeTestCase { public void testFollowIndex() throws Exception { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowStatsIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowStatsIT.java index 61307dfd85dbd..bf6f080099088 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowStatsIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowStatsIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.ccr.action; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; @@ -34,7 +33,6 @@ * Test scope is important to ensure that other tests added to this suite do not interfere with the expectation in * testStatsWhenNoPersistentTasksMetaDataExists that the cluster state does not contain any persistent tasks metadata. */ -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/36764") public class FollowStatsIT extends CcrSingleNodeTestCase { /**