Skip to content

Commit

Permalink
Handle failure to get remote client in unfollow (#71928)
Browse files Browse the repository at this point in the history
We remove the retention leases from the leader cluster after processing
the cluster state update in the unfollow action, but today we assume
that we're still connected to the leader cluster when doing so. If the
leader cluster has been removed then `Client#getRemoteClusterClient`
throws an exception, which means the listener is never notified of the
failure.

This commit addresses this by catching the exception, logging a warning,
and routing the exception back to the client.

Closes #71885
  • Loading branch information
DaveCTurner committed Apr 20, 2021
1 parent e9f2c21 commit 4d7c8c8
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 14 deletions.
Expand Up @@ -103,7 +103,6 @@ public void clusterStateProcessed(final String source, final ClusterState oldSta
final IndexMetaData indexMetaData = oldState.metaData().index(request.getFollowerIndex());
final Map<String, String> ccrCustomMetaData = indexMetaData.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
final String remoteClusterName = ccrCustomMetaData.get(Ccr.CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY);
final Client remoteClient = client.getRemoteClusterClient(remoteClusterName);
final String leaderIndexName = ccrCustomMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY);
final String leaderIndexUuid = ccrCustomMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY);
final Index leaderIndex = new Index(leaderIndexName, leaderIndexUuid);
Expand All @@ -114,6 +113,14 @@ public void clusterStateProcessed(final String source, final ClusterState oldSta
leaderIndex);
final int numberOfShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(indexMetaData.getSettings());

final Client remoteClient;
try {
remoteClient = client.getRemoteClusterClient(remoteClusterName);
} catch (Exception e) {
onLeaseRemovalFailure(indexMetaData.getIndex(), retentionLeaseId, e);
return;
}

final GroupedActionListener<RetentionLeaseActions.Response> groupListener = new GroupedActionListener<>(
new ActionListener<Collection<RetentionLeaseActions.Response>>() {

Expand All @@ -128,16 +135,8 @@ public void onResponse(final Collection<RetentionLeaseActions.Response> response

@Override
public void onFailure(final Exception e) {
logger.warn(new ParameterizedMessage(
"[{}] failure while removing retention lease [{}] on leader primary shards",
indexMetaData.getIndex(),
retentionLeaseId),
e);
final ElasticsearchException wrapper = new ElasticsearchException(e);
wrapper.addMetadata("es.failed_to_remove_retention_leases", retentionLeaseId);
listener.onFailure(wrapper);
onLeaseRemovalFailure(indexMetaData.getIndex(), retentionLeaseId, e);
}

},
numberOfShards,
Collections.emptyList());
Expand All @@ -160,6 +159,17 @@ public void onFailure(final Exception e) {
}
}

private void onLeaseRemovalFailure(Index index, String retentionLeaseId, Exception e) {
logger.warn(new ParameterizedMessage(
"[{}] failure while removing retention lease [{}] on leader primary shards",
index,
retentionLeaseId),
e);
final ElasticsearchException wrapper = new ElasticsearchException(e);
wrapper.addMetadata("es.failed_to_remove_retention_leases", retentionLeaseId);
listener.onFailure(wrapper);
}

private void removeRetentionLeaseForShard(
final ShardId followerShardId,
final ShardId leaderShardId,
Expand Down
Expand Up @@ -6,25 +6,32 @@

package org.elasticsearch.xpack.ccr;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.remote.RemoteInfoAction;
import org.elasticsearch.action.admin.cluster.remote.RemoteInfoRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.transport.RemoteClusterConnection;
import org.elasticsearch.transport.RemoteConnectionInfo;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;

import java.util.List;
import java.util.Locale;

import static java.util.Collections.singletonMap;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;

public class RestartIndexFollowingIT extends CcrIntegTestCase {

Expand Down Expand Up @@ -92,10 +99,19 @@ public void testFollowIndex() throws Exception {
leaderClient().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get();
}

assertBusy(() -> {
assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits(),
equalTo(firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs));
});
assertBusy(() -> assertThat(
followerClient().prepareSearch("index2").get().getHits().getTotalHits(),
equalTo(firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs)));

cleanRemoteCluster();
assertAcked(followerClient().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("index2")).actionGet());
assertAcked(followerClient().admin().indices().prepareClose("index2"));

final ActionFuture<AcknowledgedResponse> unfollowFuture
= followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request("index2"));
final ElasticsearchException elasticsearchException = expectThrows(ElasticsearchException.class, unfollowFuture::actionGet);
assertThat(elasticsearchException.getMessage(), containsString("no such remote cluster"));
assertThat(elasticsearchException.getMetadataKeys(), hasItem("es.failed_to_remove_retention_leases"));
}

private void setupRemoteCluster() throws Exception {
Expand Down

0 comments on commit 4d7c8c8

Please sign in to comment.