Skip to content

Commit

Permalink
[7.5][Transform] prevent assignment if any node is older than 7.4 (#4…
Browse files Browse the repository at this point in the history
…8055)

disable task assignment of transforms if any node uses version 7.2 or 7.2 (mixed cluster).

fixes #48019
  • Loading branch information
Hendrik Muhs committed Oct 15, 2019
1 parent f0cb43f commit b2ce728
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.support.IndicesOptions;
Expand All @@ -33,18 +34,18 @@
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.TransformMessages;
import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import org.elasticsearch.xpack.transform.Transform;
import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.TransformInternalIndex;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.persistence.TransformInternalIndex;
import org.elasticsearch.xpack.transform.transforms.pivot.SchemaUtil;

import java.util.ArrayList;
Expand Down Expand Up @@ -99,8 +100,18 @@ public PersistentTasksCustomMetaData.Assignment getAssignment(TransformTaskParam
logger.debug(reason);
return new PersistentTasksCustomMetaData.Assignment(null, reason);
}

// see gh#48019 disable assignment if any node is using 7.2 or 7.3
if (clusterState.getNodes().getMinNodeVersion().before(Version.V_7_4_0)) {
String reason = "Not starting transform [" + params.getId() + "], " +
"because cluster contains nodes with version older than 7.4.0";
logger.debug(reason);
return new PersistentTasksCustomMetaData.Assignment(null, reason);
}

DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, (node) ->
node.isDataNode() && node.getVersion().onOrAfter(params.getVersion())
node.isDataNode() &&
node.getVersion().onOrAfter(params.getVersion())
);
return discoveryNode == null ? NO_NODE_FOUND : new PersistentTasksCustomMetaData.Assignment(discoveryNode.getId(), "");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.TransformInternalIndexTests;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.persistence.TransformInternalIndexTests;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -76,7 +76,7 @@ public void testNodeVersionAssignment() {
buildNewFakeTransportAddress(),
Collections.emptyMap(),
new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)),
Version.V_7_2_0))
Version.V_7_4_0))
.add(new DiscoveryNode("current-data-node-with-2-tasks",
buildNewFakeTransportAddress(),
Collections.emptyMap(),
Expand Down Expand Up @@ -123,6 +123,83 @@ transformCheckpointService, mock(SchedulerEngine.class),
equalTo("past-data-node-1"));
}

public void testDoNotSelectOldNodes() {
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addIndices(metaData, routingTable);
PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder()
.addTask("transform-task-1",
TransformTaskParams.NAME,
new TransformTaskParams("transform-task-1", Version.CURRENT, null),
new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-task", ""));

PersistentTasksCustomMetaData pTasks = pTasksBuilder.build();

metaData.putCustom(PersistentTasksCustomMetaData.TYPE, pTasks);

DiscoveryNodes.Builder nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("old-data-node-1",
buildNewFakeTransportAddress(),
Collections.emptyMap(),
new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)),
Version.V_7_2_0))
.add(new DiscoveryNode("current-data-node-with-1-task",
buildNewFakeTransportAddress(),
Collections.emptyMap(),
new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)),
Version.CURRENT))
.add(new DiscoveryNode("non-data-node-1",
buildNewFakeTransportAddress(),
Collections.emptyMap(),
Collections.singleton(DiscoveryNodeRole.MASTER_ROLE),
Version.CURRENT));

ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"))
.nodes(nodes);
csBuilder.routingTable(routingTable.build());
csBuilder.metaData(metaData);

ClusterState cs = csBuilder.build();
Client client = mock(Client.class);
TransformAuditor mockAuditor = mock(TransformAuditor.class);
TransformConfigManager transformsConfigManager = new TransformConfigManager(client, xContentRegistry());
TransformCheckpointService transformCheckpointService = new TransformCheckpointService(client,
transformsConfigManager, mockAuditor);
ClusterSettings cSettings = new ClusterSettings(Settings.EMPTY,
Collections.singleton(TransformTask.NUM_FAILURE_RETRIES_SETTING));
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(cSettings);
when(clusterService.state()).thenReturn(TransformInternalIndexTests.STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE);
TransformPersistentTasksExecutor executor = new TransformPersistentTasksExecutor(client,
transformsConfigManager,
transformCheckpointService, mock(SchedulerEngine.class),
new TransformAuditor(client, ""),
mock(ThreadPool.class),
clusterService,
Settings.EMPTY);

// old-data-node-1 prevents assignment
assertNull(executor.getAssignment(new TransformTaskParams("new-task-id", Version.CURRENT, null), cs).getExecutorNode());

// remove the old 7.2 node
nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("current-data-node-with-1-task",
buildNewFakeTransportAddress(),
Collections.emptyMap(),
new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)),
Version.CURRENT))
.add(new DiscoveryNode("non-data-node-1",
buildNewFakeTransportAddress(),
Collections.emptyMap(),
Collections.singleton(DiscoveryNodeRole.MASTER_ROLE),
Version.CURRENT));
csBuilder.nodes(nodes);
cs = csBuilder.build();

assertThat(executor.getAssignment(new TransformTaskParams("new-old-task-id", Version.V_7_2_0, null), cs).getExecutorNode(),
equalTo("current-data-node-with-1-task"));
}

public void testVerifyIndicesPrimaryShardsAreActive() {
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
Expand Down

0 comments on commit b2ce728

Please sign in to comment.