diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java index 4dbc1933edef3..f1e865decb6ea 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java @@ -7,9 +7,12 @@ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.remote.RemoteInfoAction; import org.elasticsearch.action.admin.cluster.remote.RemoteInfoRequest; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; @@ -17,13 +20,17 @@ import org.elasticsearch.transport.RemoteConnectionStrategy; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.CcrIntegTestCase; +import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; +import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; import java.util.List; import java.util.Locale; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; public class RestartIndexFollowingIT extends CcrIntegTestCase { @@ -88,10 +95,19 @@ public void testFollowIndex() throws Exception { leaderClient().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get(); } - assertBusy(() -> { - assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, - equalTo(firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs)); - }); + assertBusy(() -> assertThat( + followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, + equalTo(firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs))); + + cleanRemoteCluster(); + assertAcked(followerClient().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("index2")).actionGet()); + assertAcked(followerClient().admin().indices().prepareClose("index2")); + + final ActionFuture unfollowFuture + = followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request("index2")); + final ElasticsearchException elasticsearchException = expectThrows(ElasticsearchException.class, unfollowFuture::actionGet); + assertThat(elasticsearchException.getMessage(), containsString("no such remote cluster")); + assertThat(elasticsearchException.getMetadataKeys(), hasItem("es.failed_to_remove_retention_leases")); } private void setupRemoteCluster() throws Exception { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java index 33e49a67bbda0..e9d7d5f38d823 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java @@ -97,7 +97,7 @@ public void clusterStateProcessed(final String source, final ClusterState oldSta final IndexMetadata indexMetadata = oldState.metadata().index(request.getFollowerIndex()); final Map ccrCustomMetadata = indexMetadata.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); final String remoteClusterName = ccrCustomMetadata.get(Ccr.CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY); - final Client remoteClient = client.getRemoteClusterClient(remoteClusterName); + final String leaderIndexName = ccrCustomMetadata.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY); final String leaderIndexUuid = ccrCustomMetadata.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); final Index leaderIndex = new Index(leaderIndexName, leaderIndexUuid); @@ -108,6 +108,14 @@ public void clusterStateProcessed(final String source, final ClusterState oldSta leaderIndex); final int numberOfShards = IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.get(indexMetadata.getSettings()); + final Client remoteClient; + try { + remoteClient = client.getRemoteClusterClient(remoteClusterName); + } catch (Exception e) { + onLeaseRemovalFailure(indexMetadata.getIndex(), retentionLeaseId, e); + return; + } + final GroupedActionListener groupListener = new GroupedActionListener<>( new ActionListener>() { @@ -122,16 +130,8 @@ public void onResponse(final Collection responses) { @Override public void onFailure(final Exception e) { - logger.warn(new ParameterizedMessage( - "[{}] failure while removing retention lease [{}] on leader primary shards", - indexMetadata.getIndex(), - retentionLeaseId), - e); - final ElasticsearchException wrapper = new ElasticsearchException(e); - wrapper.addMetadata("es.failed_to_remove_retention_leases", retentionLeaseId); - listener.onFailure(wrapper); + onLeaseRemovalFailure(indexMetadata.getIndex(), retentionLeaseId, e); } - }, numberOfShards ); @@ -154,6 +154,17 @@ public void onFailure(final Exception e) { } } + private void onLeaseRemovalFailure(Index index, String retentionLeaseId, Exception e) { + logger.warn(new ParameterizedMessage( + "[{}] failure while removing retention lease [{}] on leader primary shards", + index, + retentionLeaseId), + e); + final ElasticsearchException wrapper = new ElasticsearchException(e); + wrapper.addMetadata("es.failed_to_remove_retention_leases", retentionLeaseId); + listener.onFailure(wrapper); + } + private void removeRetentionLeaseForShard( final ShardId followerShardId, final ShardId leaderShardId,