Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make CCR resilient against missing remote cluster connections #36682

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -230,11 +230,17 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}

List<String> removedRemoteClusters = new ArrayList<>();
for (String remoteCluster : autoFollowers.keySet()) {
for (Map.Entry<String, AutoFollower> entry : autoFollowers.entrySet()) {
String remoteCluster = entry.getKey();
AutoFollower autoFollower = entry.getValue();
boolean exist = autoFollowMetadata.getPatterns().values().stream()
.anyMatch(pattern -> pattern.getRemoteCluster().equals(remoteCluster));
if (exist == false) {
removedRemoteClusters.add(remoteCluster);
} else if (autoFollower.remoteClusterConnectionMissing) {
LOGGER.info("Retrying auto follower [{}] after remote cluster connection was missing", remoteCluster);
autoFollower.remoteClusterConnectionMissing = false;
autoFollower.start();
}
}
this.autoFollowers = autoFollowers
Expand Down Expand Up @@ -266,6 +272,7 @@ abstract static class AutoFollower {

private volatile long lastAutoFollowTimeInMillis = -1;
private volatile long metadataVersion = 0;
private volatile boolean remoteClusterConnectionMissing = false;
private volatile CountDown autoFollowPatternsCountDown;
private volatile AtomicArray<AutoFollowResult> autoFollowResults;

Expand Down Expand Up @@ -312,6 +319,14 @@ void start() {
autoFollowIndices(autoFollowMetadata, clusterState, remoteClusterState, patterns);
} else {
assert remoteError != null;
String expectedErrorMessage = "unknown cluster alias [" + remoteCluster + "]";
if (remoteError instanceof IllegalArgumentException &&
expectedErrorMessage.equals(remoteError.getMessage())) {
LOGGER.info("AutoFollower for cluster [{}] has stopped, because remote connection is gone", remoteCluster);
remoteClusterConnectionMissing = true;
return;
}

for (int i = 0; i < patterns.size(); i++) {
String autoFollowPatternName = patterns.get(i);
finalise(i, new AutoFollowResult(autoFollowPatternName, remoteError));
Expand Down
Expand Up @@ -419,7 +419,7 @@ private void updateSettings(final LongConsumer handler, final AtomicInteger retr

private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) {
assert e != null;
if (shouldRetry(e) && isStopped() == false) {
if (shouldRetry(params.getRemoteCluster(), e) && isStopped() == false) {
int currentRetry = retryCounter.incrementAndGet();
LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying [{}]",
params.getFollowShardId(), currentRetry), e);
Expand All @@ -441,13 +441,14 @@ static long computeDelay(int currentRetry, long maxRetryDelayInMillis) {
return Math.min(backOffDelay, maxRetryDelayInMillis);
}

static boolean shouldRetry(Exception e) {
static boolean shouldRetry(String remoteCluster, Exception e) {
if (NetworkExceptionHelper.isConnectException(e)) {
return true;
} else if (NetworkExceptionHelper.isCloseConnectionException(e)) {
return true;
}

String noSuchRemoteClusterMessage = "no such remote cluster: " + remoteCluster;
final Throwable actual = ExceptionsHelper.unwrapCause(e);
return actual instanceof ShardNotFoundException ||
actual instanceof IllegalIndexShardStateException ||
Expand All @@ -460,7 +461,8 @@ static boolean shouldRetry(Exception e) {
actual instanceof NodeDisconnectedException ||
actual instanceof NodeNotConnectedException ||
actual instanceof NodeClosedException ||
(actual.getMessage() != null && actual.getMessage().contains("TransportService is closed"));
(actual.getMessage() != null && actual.getMessage().contains("TransportService is closed")) ||
(actual instanceof IllegalArgumentException && noSuchRemoteClusterMessage.equals(actual.getMessage()));
}

// These methods are protected for testing purposes:
Expand Down
Expand Up @@ -244,7 +244,11 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co
request.setMaxOperationCount(maxOperationCount);
request.setMaxBatchSize(params.getMaxReadRequestSize());
request.setPollTimeout(params.getReadPollTimeout());
remoteClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler));
try {
remoteClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler));
} catch (Exception e) {
errorHandler.accept(e);
}
}
};
}
Expand Down Expand Up @@ -274,7 +278,7 @@ protected void nodeOperation(final AllocatedPersistentTask task, final ShardFoll
return;
}

if (ShardFollowNodeTask.shouldRetry(e)) {
if (ShardFollowNodeTask.shouldRetry(params.getRemoteCluster(), e)) {
logger.debug(new ParameterizedMessage("failed to fetch follow shard global {} checkpoint and max sequence number",
shardFollowNodeTask), e);
threadPool.schedule(params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME, () -> nodeOperation(task, params, state));
Expand Down
Expand Up @@ -6,25 +6,31 @@

package org.elasticsearch.xpack.ccr;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.xpack.CcrSingleNodeTestCase;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;

import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/36764")
public class LocalIndexFollowingIT extends CcrSingleNodeTestCase {

public void testFollowIndex() throws Exception {
Expand Down Expand Up @@ -86,6 +92,51 @@ public void testDoNotCreateFollowerIfLeaderDoesNotHaveSoftDeletes() throws Excep
assertThat(client().admin().indices().prepareExists("follower-index").get().isExists(), equalTo(false));
}

public void testRemoveRemoteConnection() throws Exception {
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
request.setName("my_pattern");
request.setRemoteCluster("local");
request.setLeaderIndexPatterns(Collections.singletonList("logs-*"));
request.setFollowIndexNamePattern("copy-{{leader_index}}");
request.setReadPollTimeout(TimeValue.timeValueMillis(10));
assertTrue(client().execute(PutAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged());

Settings leaderIndexSettings = Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.build();
createIndex("logs-20200101", leaderIndexSettings);
client().prepareIndex("logs-20200101", "doc").setSource("{}", XContentType.JSON).get();
assertBusy(() -> {
CcrStatsAction.Response response = client().execute(CcrStatsAction.INSTANCE, new CcrStatsAction.Request()).actionGet();
assertThat(response.getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), equalTo(1L));
assertThat(response.getFollowStats().getStatsResponses().size(), equalTo(1));
assertThat(response.getFollowStats().getStatsResponses().get(0).status().followerGlobalCheckpoint(), equalTo(0L));
});

// Both auto follow patterns and index following should be resilient to remote connection being missing:
removeLocalRemote();
// This triggers a cluster state update, which should let auto follow coordinator retry auto following:
setupLocalRemote();

// This new index should be picked up by auto follow coordinator
createIndex("logs-20200102", leaderIndexSettings);
// This new document should be replicated to follower index:
client().prepareIndex("logs-20200101", "doc").setSource("{}", XContentType.JSON).get();
assertBusy(() -> {
CcrStatsAction.Response response = client().execute(CcrStatsAction.INSTANCE, new CcrStatsAction.Request()).actionGet();
assertThat(response.getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), equalTo(2L));

FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest();
statsRequest.setIndices(new String[]{"copy-logs-20200101"});
FollowStatsAction.StatsResponses responses = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(responses.getStatsResponses().size(), equalTo(1));
assertThat(responses.getStatsResponses().get(0).status().getFatalException(), nullValue());
assertThat(responses.getStatsResponses().get(0).status().followerGlobalCheckpoint(), equalTo(1L));
});
}

public static String getIndexSettings(final int numberOfShards,
final int numberOfReplicas,
final Map<String, String> additionalIndexSettings) throws IOException {
Expand Down
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.ccr.action;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
Expand Down Expand Up @@ -34,7 +33,6 @@
* Test scope is important to ensure that other tests added to this suite do not interfere with the expectation in
* testStatsWhenNoPersistentTasksMetaDataExists that the cluster state does not contain any persistent tasks metadata.
*/
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/36764")
public class FollowStatsIT extends CcrSingleNodeTestCase {

/**
Expand Down