From d8feff5fed7c4767db6404bf68f338030b8403dc Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 24 Jan 2019 08:50:23 +0100 Subject: [PATCH] Fix index filtering in follow info api. (#37752) The filtering by follower index was completely broken. Also the wrong persistent tasks were selected, causing the wrong status to be reported. Closes #37738 --- .../ccr/action/TransportFollowInfoAction.java | 26 ++-- .../elasticsearch/xpack/ccr/FollowInfoIT.java | 139 ++++++++++++++++++ .../TransportFollowInfoActionTests.java | 73 +++++++++ .../TransportFollowStatsActionTests.java | 2 +- 4 files changed, 228 insertions(+), 12 deletions(-) create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowInfoIT.java create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoActionTests.java diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoAction.java index 4ad44ea8ce124..09d2c31c4c0b3 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoAction.java @@ -67,19 +67,28 @@ protected void masterOperation(FollowInfoAction.Request request, List concreteFollowerIndices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(state, IndicesOptions.STRICT_EXPAND_OPEN_CLOSED, request.getFollowerIndices())); + List followerInfos = getFollowInfos(concreteFollowerIndices, state); + listener.onResponse(new FollowInfoAction.Response(followerInfos)); + } + + @Override + protected ClusterBlockException checkBlock(FollowInfoAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } + static List getFollowInfos(List concreteFollowerIndices, ClusterState state) { List followerInfos = new ArrayList<>(); PersistentTasksCustomMetaData persistentTasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); - for (IndexMetaData indexMetaData : state.metaData()) { + for (String index : concreteFollowerIndices) { + IndexMetaData indexMetaData = state.metaData().index(index); Map ccrCustomData = indexMetaData.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); if (ccrCustomData != null) { Optional result; if (persistentTasks != null) { - result = persistentTasks.taskMap().values().stream() - .map(persistentTask -> (ShardFollowTask) persistentTask.getParams()) - .filter(shardFollowTask -> concreteFollowerIndices.isEmpty() || - concreteFollowerIndices.contains(shardFollowTask.getFollowShardId().getIndexName())) + result = persistentTasks.findTasks(ShardFollowTask.NAME, task -> true).stream() + .map(task -> (ShardFollowTask) task.getParams()) + .filter(shardFollowTask -> index.equals(shardFollowTask.getFollowShardId().getIndexName())) .findAny(); } else { result = Optional.empty(); @@ -109,11 +118,6 @@ protected void masterOperation(FollowInfoAction.Request request, } } - listener.onResponse(new FollowInfoAction.Response(followerInfos)); - } - - @Override - protected ClusterBlockException checkBlock(FollowInfoAction.Request request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + return followerInfos; } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowInfoIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowInfoIT.java new file mode 100644 index 0000000000000..478043a862b38 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowInfoIT.java @@ -0,0 +1,139 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.xpack.CcrSingleNodeTestCase; +import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction; +import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; +import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; + +import java.util.Comparator; + +import static java.util.Collections.singletonMap; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.ccr.LocalIndexFollowingIT.getIndexSettings; +import static org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.Status; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class FollowInfoIT extends CcrSingleNodeTestCase { + + public void testFollowInfoApiFollowerIndexFiltering() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, 0, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureGreen("leader1"); + assertAcked(client().admin().indices().prepareCreate("leader2").setSource(leaderIndexSettings, XContentType.JSON)); + ensureGreen("leader2"); + + PutFollowAction.Request followRequest = getPutFollowRequest("leader1", "follower1"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); + + followRequest = getPutFollowRequest("leader2", "follower2"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); + + FollowInfoAction.Request request = new FollowInfoAction.Request(); + request.setFollowerIndices("follower1"); + FollowInfoAction.Response response = client().execute(FollowInfoAction.INSTANCE, request).actionGet(); + assertThat(response.getFollowInfos().size(), equalTo(1)); + assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower1")); + assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader1")); + assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.ACTIVE)); + assertThat(response.getFollowInfos().get(0).getParameters(), notNullValue()); + + request = new FollowInfoAction.Request(); + request.setFollowerIndices("follower2"); + response = client().execute(FollowInfoAction.INSTANCE, request).actionGet(); + assertThat(response.getFollowInfos().size(), equalTo(1)); + assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower2")); + assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader2")); + assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.ACTIVE)); + assertThat(response.getFollowInfos().get(0).getParameters(), notNullValue()); + + request = new FollowInfoAction.Request(); + request.setFollowerIndices("_all"); + response = client().execute(FollowInfoAction.INSTANCE, request).actionGet(); + response.getFollowInfos().sort(Comparator.comparing(FollowInfoAction.Response.FollowerInfo::getFollowerIndex)); + assertThat(response.getFollowInfos().size(), equalTo(2)); + assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower1")); + assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader1")); + assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.ACTIVE)); + assertThat(response.getFollowInfos().get(0).getParameters(), notNullValue()); + assertThat(response.getFollowInfos().get(1).getFollowerIndex(), equalTo("follower2")); + assertThat(response.getFollowInfos().get(1).getLeaderIndex(), equalTo("leader2")); + assertThat(response.getFollowInfos().get(1).getStatus(), equalTo(Status.ACTIVE)); + assertThat(response.getFollowInfos().get(1).getParameters(), notNullValue()); + + // Pause follower1 index and check the follower info api: + assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet()); + + request = new FollowInfoAction.Request(); + request.setFollowerIndices("follower1"); + response = client().execute(FollowInfoAction.INSTANCE, request).actionGet(); + assertThat(response.getFollowInfos().size(), equalTo(1)); + assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower1")); + assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader1")); + assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.PAUSED)); + assertThat(response.getFollowInfos().get(0).getParameters(), nullValue()); + + request = new FollowInfoAction.Request(); + request.setFollowerIndices("follower2"); + response = client().execute(FollowInfoAction.INSTANCE, request).actionGet(); + assertThat(response.getFollowInfos().size(), equalTo(1)); + assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower2")); + assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader2")); + assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.ACTIVE)); + assertThat(response.getFollowInfos().get(0).getParameters(), notNullValue()); + + request = new FollowInfoAction.Request(); + request.setFollowerIndices("_all"); + response = client().execute(FollowInfoAction.INSTANCE, request).actionGet(); + response.getFollowInfos().sort(Comparator.comparing(FollowInfoAction.Response.FollowerInfo::getFollowerIndex)); + assertThat(response.getFollowInfos().size(), equalTo(2)); + assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower1")); + assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader1")); + assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.PAUSED)); + assertThat(response.getFollowInfos().get(0).getParameters(), nullValue()); + assertThat(response.getFollowInfos().get(1).getFollowerIndex(), equalTo("follower2")); + assertThat(response.getFollowInfos().get(1).getLeaderIndex(), equalTo("leader2")); + assertThat(response.getFollowInfos().get(1).getStatus(), equalTo(Status.ACTIVE)); + assertThat(response.getFollowInfos().get(1).getParameters(), notNullValue()); + + assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower2")).actionGet()); + } + + public void testFollowInfoApiIndexMissing() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, 0, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureGreen("leader1"); + assertAcked(client().admin().indices().prepareCreate("leader2").setSource(leaderIndexSettings, XContentType.JSON)); + ensureGreen("leader2"); + + PutFollowAction.Request followRequest = getPutFollowRequest("leader1", "follower1"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); + + followRequest = getPutFollowRequest("leader2", "follower2"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); + + FollowInfoAction.Request request1 = new FollowInfoAction.Request(); + request1.setFollowerIndices("follower3"); + expectThrows(IndexNotFoundException.class, () -> client().execute(FollowInfoAction.INSTANCE, request1).actionGet()); + + FollowInfoAction.Request request2 = new FollowInfoAction.Request(); + request2.setFollowerIndices("follower2", "follower3"); + expectThrows(IndexNotFoundException.class, () -> client().execute(FollowInfoAction.INSTANCE, request2).actionGet()); + + assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet()); + assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower2")).actionGet()); + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoActionTests.java new file mode 100644 index 0000000000000..4a023e6ee3ed2 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoActionTests.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ccr.Ccr; +import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response; +import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import static org.elasticsearch.xpack.ccr.action.TransportFollowStatsActionTests.createShardFollowTask; +import static org.hamcrest.Matchers.equalTo; + +public class TransportFollowInfoActionTests extends ESTestCase { + + public void testGetFollowInfos() { + ClusterState state = createCS( + new String[] {"follower1", "follower2", "follower3", "index4"}, + new boolean[]{true, true, true, false}, + new boolean[]{true, true, false, false} + ); + List concreteIndices = Arrays.asList("follower1", "follower3"); + + List result = TransportFollowInfoAction.getFollowInfos(concreteIndices, state); + assertThat(result.size(), equalTo(2)); + assertThat(result.get(0).getFollowerIndex(), equalTo("follower1")); + assertThat(result.get(0).getStatus(), equalTo(Response.Status.ACTIVE)); + assertThat(result.get(1).getFollowerIndex(), equalTo("follower3")); + assertThat(result.get(1).getStatus(), equalTo(Response.Status.PAUSED)); + } + + private static ClusterState createCS(String[] indices, boolean[] followerIndices, boolean[] statuses) { + PersistentTasksCustomMetaData.Builder persistentTasks = PersistentTasksCustomMetaData.builder(); + MetaData.Builder mdBuilder = MetaData.builder(); + for (int i = 0; i < indices.length; i++) { + String index = indices[i]; + boolean isFollowIndex = followerIndices[i]; + boolean active = statuses[i]; + + IndexMetaData.Builder imdBuilder = IndexMetaData.builder(index) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0); + + if (isFollowIndex) { + imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>()); + if (active) { + persistentTasks.addTask(Integer.toString(i), ShardFollowTask.NAME, createShardFollowTask(index), null); + } + } + mdBuilder.put(imdBuilder); + } + + mdBuilder.putCustom(PersistentTasksCustomMetaData.TYPE, persistentTasks.build()); + return ClusterState.builder(new ClusterName("_cluster")) + .metaData(mdBuilder.build()) + .build(); + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsActionTests.java index bc8c58f1de7de..b8f570e4ef4f6 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsActionTests.java @@ -44,7 +44,7 @@ public void testFindFollowerIndicesFromShardFollowTasks() { assertThat(result.size(), equalTo(0)); } - private static ShardFollowTask createShardFollowTask(String followerIndex) { + static ShardFollowTask createShardFollowTask(String followerIndex) { return new ShardFollowTask( null, new ShardId(followerIndex, "", 0),