Skip to content

Commit

Permalink
[CCR] AutoFollowCoordinator and follower index already created (#36540)
Browse files Browse the repository at this point in the history
The AutoFollowCoordinator should be resilient to the fact that the follower
index has already been created and in that case it should only update
the auto follow metadata with the fact that the follower index was created.

Relates to #33007
  • Loading branch information
martijnvg committed Dec 24, 2018
1 parent 44fe265 commit 561b704
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
client.getRemoteClusterClient(clusterAlias),
request,
onFailure,
remoteClusterStateResponse -> {
remoteClusterStateResponse -> {
ClusterState remoteClusterState = remoteClusterStateResponse.getState();
IndexMetaData leaderIndexMetaData = remoteClusterState.getMetaData().index(leaderIndex);
if (leaderIndexMetaData == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
Expand Down Expand Up @@ -365,8 +366,8 @@ private void autoFollowIndices(final AutoFollowMetadata autoFollowMetadata,
Map<String, String> headers = autoFollowMetadata.getHeaders().get(autoFollowPatternName);
List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPatternName);

final List<Index> leaderIndicesToFollow = getLeaderIndicesToFollow(autoFollowPattern, remoteClusterState,
clusterState, followedIndices);
final List<Index> leaderIndicesToFollow =
getLeaderIndicesToFollow(autoFollowPattern, remoteClusterState, followedIndices);
if (leaderIndicesToFollow.isEmpty()) {
finalise(slot, new AutoFollowResult(autoFollowPatternName));
} else {
Expand All @@ -379,7 +380,7 @@ private void autoFollowIndices(final AutoFollowMetadata autoFollowMetadata,

Consumer<AutoFollowResult> resultHandler = result -> finalise(slot, result);
checkAutoFollowPattern(autoFollowPatternName, remoteCluster, autoFollowPattern, leaderIndicesToFollow, headers,
patternsForTheSameRemoteCluster, remoteClusterState.metaData(), resultHandler);
patternsForTheSameRemoteCluster, remoteClusterState.metaData(), clusterState.metaData(), resultHandler);
}
i++;
}
Expand All @@ -393,6 +394,7 @@ private void checkAutoFollowPattern(String autoFollowPattenName,
Map<String, String> headers,
List<Tuple<String, AutoFollowPattern>> patternsForTheSameRemoteCluster,
MetaData remoteMetadata,
MetaData localMetadata,
Consumer<AutoFollowResult> resultHandler) {

final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size());
Expand Down Expand Up @@ -430,7 +432,16 @@ private void checkAutoFollowPattern(String autoFollowPattenName,
}
});
continue;
} else if (leaderIndexAlreadyFollowed(autoFollowPattern, indexToFollow, localMetadata)) {
updateAutoFollowMetadata(recordLeaderIndexAsFollowFunction(autoFollowPattenName, indexToFollow), error -> {
results.set(slot, new Tuple<>(indexToFollow, error));
if (leaderIndicesCountDown.countDown()) {
resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList()));
}
});
continue;
}

followLeaderIndex(autoFollowPattenName, remoteCluster, indexToFollow, autoFollowPattern, headers, error -> {
results.set(slot, new Tuple<>(indexToFollow, error));
if (leaderIndicesCountDown.countDown()) {
Expand All @@ -441,6 +452,25 @@ private void checkAutoFollowPattern(String autoFollowPattenName,
}
}

private static boolean leaderIndexAlreadyFollowed(AutoFollowPattern autoFollowPattern,
Index leaderIndex,
MetaData localMetadata) {
String followIndexName = getFollowerIndexName(autoFollowPattern, leaderIndex.getName());
IndexMetaData indexMetaData = localMetadata.index(followIndexName);
if (indexMetaData != null) {
// If an index with the same name exists, but it is not a follow index for this leader index then
// we should let the auto follower attempt to auto follow it, so it can fail later and
// it is then visible in the auto follow stats. For example a cluster can just happen to have
// an index with the same name as the new follower index.
Map<String, String> customData = indexMetaData.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
if (customData != null) {
String recordedLeaderIndexUUID = customData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY);
return leaderIndex.getUUID().equals(recordedLeaderIndexUUID);
}
}
return false;
}

private void followLeaderIndex(String autoFollowPattenName,
String remoteCluster,
Index indexToFollow,
Expand Down Expand Up @@ -492,7 +522,6 @@ private void finalise(int slot, AutoFollowResult result) {

static List<Index> getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern,
ClusterState remoteClusterState,
ClusterState followerClusterState,
List<String> followedIndexUUIDs) {
List<Index> leaderIndicesToFollow = new ArrayList<>();
for (IndexMetaData leaderIndexMetaData : remoteClusterState.getMetaData()) {
Expand All @@ -505,10 +534,7 @@ static List<Index> getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern,
// this index will be auto followed.
indexRoutingTable.allPrimaryShardsActive() &&
followedIndexUUIDs.contains(leaderIndexMetaData.getIndex().getUUID()) == false) {
// TODO: iterate over the indices in the followerClusterState and check whether a IndexMetaData
// has a leader index uuid custom metadata entry that matches with uuid of leaderIndexMetaData variable
// If so then handle it differently: not follow it, but just add an entry to
// AutoFollowMetadata#followedLeaderIndexUUIDs

leaderIndicesToFollow.add(leaderIndexMetaData.getIndex());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower;
Expand Down Expand Up @@ -345,8 +346,7 @@ public void testGetLeaderIndicesToFollow() {
.routingTable(routingTableBuilder.build())
.build();

List<Index> result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, clusterState,
Collections.emptyList());
List<Index> result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, Collections.emptyList());
result.sort(Comparator.comparing(Index::getName));
assertThat(result.size(), equalTo(5));
assertThat(result.get(0).getName(), equalTo("metrics-0"));
Expand All @@ -356,7 +356,7 @@ public void testGetLeaderIndicesToFollow() {
assertThat(result.get(4).getName(), equalTo("metrics-4"));

List<String> followedIndexUUIDs = Collections.singletonList(remoteState.metaData().index("metrics-2").getIndexUUID());
result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, clusterState, followedIndexUUIDs);
result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, followedIndexUUIDs);
result.sort(Comparator.comparing(Index::getName));
assertThat(result.size(), equalTo(4));
assertThat(result.get(0).getName(), equalTo("metrics-0"));
Expand Down Expand Up @@ -390,8 +390,7 @@ public void testGetLeaderIndicesToFollow_shardsNotStarted() {
.routingTable(RoutingTable.builder(remoteState.routingTable()).add(indexRoutingTable).build())
.build();

List<Index> result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, clusterState,
Collections.emptyList());
List<Index> result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, Collections.emptyList());
assertThat(result.size(), equalTo(1));
assertThat(result.get(0).getName(), equalTo("index1"));

Expand All @@ -404,7 +403,7 @@ public void testGetLeaderIndicesToFollow_shardsNotStarted() {
.routingTable(RoutingTable.builder(remoteState.routingTable()).add(indexRoutingTable).build())
.build();

result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, clusterState, Collections.emptyList());
result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, Collections.emptyList());
assertThat(result.size(), equalTo(2));
result.sort(Comparator.comparing(Index::getName));
assertThat(result.get(0).getName(), equalTo("index1"));
Expand Down Expand Up @@ -864,6 +863,83 @@ void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List<String> pa
"because soft deletes are not enabled"));
}

public void testAutoFollowerFollowerIndexAlreadyExists() {
Client client = mock(Client.class);
when(client.getRemoteClusterClient(anyString())).thenReturn(client);

ClusterState remoteState = createRemoteClusterState("logs-20190101", true);

AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
null, null, null, null, null, null, null, null, null, null, null);
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("remote", autoFollowPattern);
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
followedLeaderIndexUUIDS.put("remote", new ArrayList<>());
Map<String, Map<String, String>> autoFollowHeaders = new HashMap<>();
autoFollowHeaders.put("remote", Collections.singletonMap("key", "val"));
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, autoFollowHeaders);

ClusterState currentState = ClusterState.builder(new ClusterName("name"))
.metaData(MetaData.builder()
.put(IndexMetaData.builder("logs-20190101")
.settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true))
.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, Collections.singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY,
remoteState.metaData().index("logs-20190101").getIndexUUID()))
.numberOfShards(1)
.numberOfReplicas(0))
.putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
.build();


final Object[] resultHolder = new Object[1];
Consumer<List<AutoFollowCoordinator.AutoFollowResult>> handler = results -> {
resultHolder[0] = results;
};
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(currentState), () -> 1L) {
@Override
void getRemoteClusterState(String remoteCluster,
long metadataVersion,
BiConsumer<ClusterStateResponse, Exception> handler) {
assertThat(remoteCluster, equalTo("remote"));
handler.accept(new ClusterStateResponse(new ClusterName("name"), remoteState, 1L, false), null);
}

@Override
void createAndFollow(Map<String, String> headers,
PutFollowAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler) {
fail("this should not be invoked");
}

@Override
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction,
Consumer<Exception> handler) {
ClusterState resultCs = updateFunction.apply(currentState);
AutoFollowMetadata result = resultCs.metaData().custom(AutoFollowMetadata.TYPE);
assertThat(result.getFollowedLeaderIndexUUIDs().size(), equalTo(1));
assertThat(result.getFollowedLeaderIndexUUIDs().get("remote").size(), equalTo(1));
handler.accept(null);
}

@Override
void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List<String> patterns) {
// Ignore, to avoid invoking updateAutoFollowMetadata(...) twice
}
};
autoFollower.start();

@SuppressWarnings("unchecked")
List<AutoFollowCoordinator.AutoFollowResult> results = (List<AutoFollowCoordinator.AutoFollowResult>) resultHolder[0];
assertThat(results, notNullValue());
assertThat(results.size(), equalTo(1));
assertThat(results.get(0).clusterStateFetchException, nullValue());
List<Map.Entry<Index, Exception>> entries = new ArrayList<>(results.get(0).autoFollowExecutionResults.entrySet());
assertThat(entries.size(), equalTo(1));
assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101"));
assertThat(entries.get(0).getValue(), nullValue());
}

private static ClusterState createRemoteClusterState(String indexName, Boolean enableSoftDeletes) {
Settings.Builder indexSettings;
if (enableSoftDeletes != null) {
Expand Down

0 comments on commit 561b704

Please sign in to comment.