Skip to content

Commit

Permalink
Allow CCR on nodes with legacy roles only (#60093)
Browse files Browse the repository at this point in the history
CCR will stop functioning if the master node is on 7.8, but data nodes 
are before that version because the master node considers that all data
nodes do not have the remote cluster client role. This commit allows CCR
work on data nodes with legacy roles only.

Relates #54146
Relates #59375
  • Loading branch information
dnhatn committed Aug 11, 2020
1 parent 1479a9e commit cb55f63
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.cluster.node;

import org.elasticsearch.Version;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -172,6 +173,12 @@ public Setting<Boolean> legacySetting() {
public static SortedSet<DiscoveryNodeRole> BUILT_IN_ROLES = Collections.unmodifiableSortedSet(
new TreeSet<>(Arrays.asList(DATA_ROLE, INGEST_ROLE, MASTER_ROLE, REMOTE_CLUSTER_CLIENT_ROLE)));

/**
* The version that {@link #REMOTE_CLUSTER_CLIENT_ROLE} is introduced. Nodes before this version do not have that role even
* they can connect to remote clusters.
*/
public static final Version REMOTE_CLUSTER_CLIENT_ROLE_VERSION = Version.V_7_8_0;

static SortedSet<DiscoveryNodeRole> LEGACY_ROLES =
Collections.unmodifiableSortedSet(new TreeSet<>(Arrays.asList(DATA_ROLE, INGEST_ROLE, MASTER_ROLE)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -124,14 +125,18 @@ public void validate(ShardFollowTask params, ClusterState clusterState) {

@Override
public Assignment getAssignment(final ShardFollowTask params, final ClusterState clusterState) {
final DiscoveryNode node = selectLeastLoadedNode(
clusterState,
DiscoveryNode selectedNode = selectLeastLoadedNode(clusterState,
((Predicate<DiscoveryNode>) DiscoveryNode::isDataNode).and(DiscoveryNode::isRemoteClusterClient)
);
if (node == null) {
if (selectedNode == null) {
// best effort as nodes before 7.8 might not be able to connect to remote clusters
selectedNode = selectLeastLoadedNode(clusterState,
node -> node.isDataNode() && node.getVersion().before(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE_VERSION));
}
if (selectedNode == null) {
return NO_ASSIGNMENT;
} else {
return new Assignment(node.getId(), "node is the least loaded data node and remote cluster client");
return new Assignment(selectedNode.getId(), "node is the least loaded data node and remote cluster client");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,14 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
return allocation.decision(Decision.YES, NAME,
"shard is a primary follower but was bootstrapped already; hence is not under the purview of this decider");
}
if (node.node().isRemoteClusterClient() == false) {
return allocation.decision(Decision.NO, NAME, "shard is a primary follower and being bootstrapped, but node does not have the "
+ DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role");
if (node.node().isRemoteClusterClient()) {
return allocation.decision(Decision.YES, NAME,
"shard is a primary follower and node has the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role");
}
if (node.node().getVersion().before(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE_VERSION)) {
return allocation.decision(Decision.YES, NAME, "shard is a primary follower and node has only the legacy roles");
}
return allocation.decision(Decision.YES, NAME,
"shard is a primary follower and node has the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role");
return allocation.decision(Decision.NO, NAME, "shard is a primary follower and being bootstrapped, but node does not have the "
+ DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.CcrSettings;

Expand All @@ -38,9 +40,13 @@ public class ShardFollowTasksExecutorAssignmentTests extends ESTestCase {

public void testAssignmentToNodeWithDataAndRemoteClusterClientRoles() {
runAssignmentTest(
new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)),
randomIntBetween(0, 8),
() -> new HashSet<>(randomSubsetOf(new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.INGEST_ROLE)))),
newNode(
Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE),
VersionUtils.randomVersion(random())),
newNodes(
between(0, 8),
() -> Sets.newHashSet(randomSubsetOf(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.INGEST_ROLE))),
Version.CURRENT),
(theSpecial, assignment) -> {
assertTrue(assignment.isAssigned());
assertThat(assignment.getExecutorNode(), equalTo(theSpecial.getId()));
Expand All @@ -56,11 +62,26 @@ public void testRemoteClusterClientRoleWithoutDataRole() {
runNoAssignmentTest(Collections.singleton(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE));
}

public void testNodeWithLegacyRolesOnly() {
final Version oldVersion = VersionUtils.randomVersionBetween(random(),
Version.V_6_0_0, VersionUtils.getPreviousVersion(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE_VERSION));
runAssignmentTest(
newNode(Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE), oldVersion),
newNodes(
between(0, 8),
() -> Sets.newHashSet(randomSubsetOf(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.INGEST_ROLE))),
Version.CURRENT),
(theSpecial, assignment) -> {
assertTrue(assignment.isAssigned());
assertThat(assignment.getExecutorNode(), equalTo(theSpecial.getId()));
}
);
}

private void runNoAssignmentTest(final Set<DiscoveryNodeRole> roles) {
runAssignmentTest(
roles,
0,
Collections::emptySet,
newNode(roles, Version.CURRENT),
Collections.emptySet(),
(theSpecial, assignment) -> {
assertFalse(assignment.isAssigned());
assertThat(assignment.getExplanation(), equalTo("no nodes found with data and remote cluster client roles"));
Expand All @@ -69,9 +90,8 @@ private void runNoAssignmentTest(final Set<DiscoveryNodeRole> roles) {
}

private void runAssignmentTest(
final Set<DiscoveryNodeRole> theSpecialRoles,
final int numberOfOtherNodes,
final Supplier<Set<DiscoveryNodeRole>> otherNodesRolesSupplier,
final DiscoveryNode targetNode,
final Set<DiscoveryNode> otherNodes,
final BiConsumer<DiscoveryNode, Assignment> consumer
) {
final ClusterService clusterService = mock(ClusterService.class);
Expand All @@ -82,25 +102,30 @@ private void runAssignmentTest(
final ShardFollowTasksExecutor executor =
new ShardFollowTasksExecutor(mock(Client.class), mock(ThreadPool.class), clusterService, settingsModule);
final ClusterState.Builder clusterStateBuilder = ClusterState.builder(new ClusterName("test"));
final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
final DiscoveryNode theSpecial = newNode(theSpecialRoles);
nodesBuilder.add(theSpecial);
for (int i = 0; i < numberOfOtherNodes; i++) {
nodesBuilder.add(newNode(otherNodesRolesSupplier.get()));
final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder().add(targetNode);
for (DiscoveryNode node : otherNodes) {
nodesBuilder.add(node);
}
clusterStateBuilder.nodes(nodesBuilder);
final Assignment assignment = executor.getAssignment(mock(ShardFollowTask.class), clusterStateBuilder.build());
consumer.accept(theSpecial, assignment);
consumer.accept(targetNode, assignment);
}

private static DiscoveryNode newNode(final Set<DiscoveryNodeRole> roles) {
private static DiscoveryNode newNode(final Set<DiscoveryNodeRole> roles, final Version version) {
return new DiscoveryNode(
"node_" + UUIDs.randomBase64UUID(random()),
buildNewFakeTransportAddress(),
Collections.emptyMap(),
roles,
Version.CURRENT
version
);
}

private static Set<DiscoveryNode> newNodes(int numberOfNodes, Supplier<Set<DiscoveryNodeRole>> rolesSupplier, Version version) {
Set<DiscoveryNode> nodes = new HashSet<>();
for (int i = 0; i < numberOfNodes; i++) {
nodes.add(newNode(rolesSupplier.get(), version));
}
return nodes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.xpack.ccr.CcrSettings;

import java.util.ArrayList;
Expand All @@ -60,6 +61,7 @@
import java.util.List;
import java.util.Set;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -151,9 +153,10 @@ public void testBootstrappingFollowerIndex() {
IndexMetadata.Builder indexMetadata = IndexMetadata.builder(index)
.settings(settings(Version.CURRENT).put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true))
.numberOfShards(1).numberOfReplicas(1);
DiscoveryNode dataOnlyNode = newNode("d1", Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE));
DiscoveryNode dataAndRemoteNode = newNode("dr1",
final DiscoveryNode dataOnlyNode = newNode("data_role_only", Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE));
final DiscoveryNode dataAndRemoteNode = newNode("data_and_remote_cluster_client_role",
Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE));
final DiscoveryNode nodeWithLegacyRolesOnly = newNodeWithLegacyRoles("legacy_roles_only");
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(dataOnlyNode).add(dataAndRemoteNode).build();
Metadata metadata = Metadata.builder().put(indexMetadata).build();
RoutingTable.Builder routingTable = RoutingTable.builder()
Expand All @@ -171,6 +174,11 @@ public void testBootstrappingFollowerIndex() {
Decision yesDecision = executeAllocation(clusterState, shardRouting.primaryShard(), dataAndRemoteNode);
assertThat(yesDecision.type(), equalTo(Decision.Type.YES));
assertThat(yesDecision.getExplanation(), equalTo("shard is a primary follower and node has the remote_cluster_client role"));

yesDecision = executeAllocation(clusterState, shardRouting.primaryShard(), nodeWithLegacyRolesOnly);
assertThat(yesDecision.type(), equalTo(Decision.Type.YES));
assertThat(yesDecision.getExplanation(), equalTo("shard is a primary follower and node has only the legacy roles"));

for (ShardRouting replica : shardRouting.replicaShards()) {
assertThat(replica.state(), equalTo(UNASSIGNED));
yesDecision = executeAllocation(clusterState, replica, randomFrom(dataOnlyNode, dataAndRemoteNode));
Expand All @@ -181,6 +189,12 @@ public void testBootstrappingFollowerIndex() {
}
}

static DiscoveryNode newNodeWithLegacyRoles(String id) {
final Version version = VersionUtils.randomVersionBetween(random(),
Version.V_6_0_0, VersionUtils.getPreviousVersion(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE_VERSION));
return new DiscoveryNode(id, buildNewFakeTransportAddress(), emptyMap(), Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE), version);
}

static Decision executeAllocation(ClusterState clusterState, ShardRouting shardRouting, DiscoveryNode node) {
final AllocationDecider decider = new CcrPrimaryFollowerAllocationDecider();
final RoutingAllocation routingAllocation = new RoutingAllocation(new AllocationDeciders(Collections.singletonList(decider)),
Expand Down

0 comments on commit cb55f63

Please sign in to comment.