Skip to content

Commit

Permalink
[ML] Reallocate model deployments on node shutdown events. (#85310) (#…
Browse files Browse the repository at this point in the history
…85329)

Trigger reallocation on Node Shutdown changes first then node change events
if the Node Shutdown API has not been used.
  • Loading branch information
davidkyle committed Mar 24, 2022
1 parent 882cd57 commit e255e40
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 6 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/85310.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 85310
summary: Reallocate model deployments on node shutdown events
area: Machine Learning
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.ml.MlMetadata;
Expand Down Expand Up @@ -423,21 +424,63 @@ static boolean shouldAllocateModels(final ClusterChangedEvent event) {
if (newMetadata == null) {
return false;
}
if (event.nodesChanged()) {

// Reallocate in reaction to either node change events or
// changes triggered by the node shutdown API.
// When the shutdown API is used the metadata is modified
// before the node is removed and then once again after
// the node has returned. In this situation the node change
// events become a no-op due to the checks against shutting
// down nodes and because reallocation has already been
// triggered by the node shutdown metadata changes.
//
// If the shutdown API is not used the node change events
// are sufficient to cause a reallocation.
//
// Shutdowns should be respected so that the service does not
// allocate models to a node that is about to leave the cluster
boolean nodesShutdownChanged = event.changedCustomMetadataSet().contains(NodesShutdownMetadata.TYPE);
if (event.nodesChanged() || nodesShutdownChanged) {
Set<String> shuttingDownNodes = nodesShuttingDown(event.state());
DiscoveryNodes.Delta nodesDelta = event.nodesDelta();

Set<String> removedNodes = nodesDelta.removedNodes().stream().map(DiscoveryNode::getId).collect(Collectors.toSet());
Set<String> addedNodes = nodesDelta.addedNodes().stream().map(DiscoveryNode::getId).collect(Collectors.toSet());

Set<String> exitingShutDownNodes;
if (nodesShutdownChanged) {
Set<String> previousShuttingDownNodes = nodesShuttingDown(event.previousState());

// Add nodes that where marked for shutdown in the previous state
// but are no longer marked as shutdown in the current state.
Set<String> returningShutDownNodes = Sets.difference(previousShuttingDownNodes, shuttingDownNodes);
addedNodes.addAll(returningShutDownNodes);

// and nodes that are marked for shutdown in this event only
exitingShutDownNodes = Sets.difference(shuttingDownNodes, previousShuttingDownNodes);
removedNodes.addAll(exitingShutDownNodes);
} else {
exitingShutDownNodes = Collections.emptySet();
}

for (TrainedModelAllocation trainedModelAllocation : newMetadata.modelAllocations().values()) {
if (trainedModelAllocation.getAllocationState().equals(AllocationState.STOPPING)) {
continue;
}
for (DiscoveryNode removed : nodesDelta.removedNodes()) {
if (trainedModelAllocation.isRoutedToNode(removed.getId())) {
for (var nodeId : exitingShutDownNodes) {
if (trainedModelAllocation.isRoutedToNode(nodeId)) {
return true;
}
}

for (var nodeId : removedNodes) {
if (trainedModelAllocation.isRoutedToNode(nodeId) && shuttingDownNodes.contains(nodeId) == false) {
return true;
}
}
for (DiscoveryNode added : nodesDelta.addedNodes()) {
if (StartTrainedModelDeploymentAction.TaskParams.mayAllocateToNode(added)
&& shuttingDownNodes.contains(added.getId()) == false) {
for (var nodeId : addedNodes) {
if (StartTrainedModelDeploymentAction.TaskParams.mayAllocateToNode(event.state().nodes().get(nodeId))
&& shuttingDownNodes.contains(nodeId) == false) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.junit.Before;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;

Expand Down Expand Up @@ -830,6 +831,198 @@ public void testShouldAllocateModels() {
);
}

public void testShouldAllocateModels_WithNodeShutdowns() {
String clusterName = "testShouldAllocateModels_WithNodeShutdowns";
String model1 = "model-1";
DiscoveryNode mlNode1 = buildNode("ml-node-1", true, ByteSizeValue.ofGb(4).getBytes());
DiscoveryNode mlNode2 = buildNode("ml-node-2", true, ByteSizeValue.ofGb(4).getBytes());
DiscoveryNode esNode1 = buildNode("es-node-1", false, ByteSizeValue.ofGb(4).getBytes());
DiscoveryNode esNode2 = buildNode("es-node-2", false, ByteSizeValue.ofGb(4).getBytes());
DiscoveryNode esNode3 = buildNode("es-node-3", false, ByteSizeValue.ofGb(4).getBytes());

TrainedModelAllocationMetadata fullModelAllocation = TrainedModelAllocationMetadata.Builder.empty()
.addNewAllocation(
model1,
TrainedModelAllocation.Builder.empty(newParams(model1, 100))
.addNewRoutingEntry(mlNode1.getId())
.updateExistingRoutingEntry(mlNode1.getId(), started())
.addNewRoutingEntry(mlNode2.getId())
.updateExistingRoutingEntry(mlNode2.getId(), started())
)
.build();

ClusterState fullyAllocated = csBuilderWithNodes(clusterName, mlNode1, mlNode2, esNode1, esNode2, esNode3).metadata(
Metadata.builder().putCustom(TrainedModelAllocationMetadata.NAME, fullModelAllocation).build()
).build();

// reallocate when the node is marked for shutdown
var previousState = fullyAllocated;
var currentState = ClusterState.builder(fullyAllocated)
.metadata(
Metadata.builder()
.putCustom(TrainedModelAllocationMetadata.NAME, fullModelAllocation)
.putCustom(NodesShutdownMetadata.TYPE, nodesShutdownMetadata(mlNode1))
.build()
)
.build();

assertThat(
TrainedModelAllocationClusterService.shouldAllocateModels(new ClusterChangedEvent("test", currentState, previousState)),
is(true)
);

previousState = currentState;

// mlNode1 node is now removed we but we have already
// reallocated on the node shutdown change
currentState = csBuilderWithNodes(clusterName, mlNode2, esNode1, esNode2, esNode3).metadata(
Metadata.builder()
.putCustom(TrainedModelAllocationMetadata.NAME, fullModelAllocation)
.putCustom(NodesShutdownMetadata.TYPE, nodesShutdownMetadata(mlNode1))
.build()
).build();

assertThat(
TrainedModelAllocationClusterService.shouldAllocateModels(new ClusterChangedEvent("test", currentState, previousState)),
is(false)
);

previousState = currentState;

// mlNode1 has returned but is still marked as shutdown
currentState = csBuilderWithNodes(clusterName, mlNode1, mlNode2, esNode1, esNode2, esNode3).metadata(
Metadata.builder()
.putCustom(TrainedModelAllocationMetadata.NAME, fullModelAllocation)
.putCustom(NodesShutdownMetadata.TYPE, nodesShutdownMetadata(mlNode1))
.build()
).build();

assertThat(
TrainedModelAllocationClusterService.shouldAllocateModels(new ClusterChangedEvent("test", currentState, previousState)),
is(false)
);

previousState = currentState;

// mlNode1 no longer marked for shutdown
currentState = csBuilderWithNodes(clusterName, mlNode1, mlNode2, esNode1, esNode2, esNode3).metadata(
Metadata.builder().putCustom(TrainedModelAllocationMetadata.NAME, fullModelAllocation).build()
).build();

assertThat(
TrainedModelAllocationClusterService.shouldAllocateModels(new ClusterChangedEvent("test", currentState, previousState)),
is(true)
);

previousState = currentState;

// now an ES node is marked for shutdown
currentState = csBuilderWithNodes(clusterName, mlNode1, mlNode2, esNode1, esNode2, esNode3).metadata(
Metadata.builder()
.putCustom(TrainedModelAllocationMetadata.NAME, fullModelAllocation)
.putCustom(NodesShutdownMetadata.TYPE, nodesShutdownMetadata(esNode1))
.build()
).build();

assertThat(
TrainedModelAllocationClusterService.shouldAllocateModels(new ClusterChangedEvent("test", currentState, previousState)),
is(false)
);

previousState = currentState;

// The ES node is removed
currentState = csBuilderWithNodes(clusterName, mlNode1, mlNode2, esNode2, esNode3).metadata(
Metadata.builder()
.putCustom(TrainedModelAllocationMetadata.NAME, fullModelAllocation)
.putCustom(NodesShutdownMetadata.TYPE, nodesShutdownMetadata(esNode1))
.build()
).build();

assertThat(
TrainedModelAllocationClusterService.shouldAllocateModels(new ClusterChangedEvent("test", currentState, previousState)),
is(false)
);

previousState = currentState;

// The ES node returns
currentState = csBuilderWithNodes(clusterName, mlNode1, mlNode2, esNode1, esNode2, esNode3).metadata(
Metadata.builder()
.putCustom(TrainedModelAllocationMetadata.NAME, fullModelAllocation)
.putCustom(NodesShutdownMetadata.TYPE, nodesShutdownMetadata(esNode1))
.build()
).build();

assertThat(
TrainedModelAllocationClusterService.shouldAllocateModels(new ClusterChangedEvent("test", currentState, previousState)),
is(false)
);

previousState = currentState;

// The ES node is no longer marked as shutdown
currentState = csBuilderWithNodes(clusterName, mlNode1, mlNode2, esNode1, esNode2, esNode3).metadata(
Metadata.builder()
.putCustom(TrainedModelAllocationMetadata.NAME, fullModelAllocation)
.putCustom(NodesShutdownMetadata.TYPE, nodesShutdownMetadata(esNode1))
.build()
).build();

assertThat(
TrainedModelAllocationClusterService.shouldAllocateModels(new ClusterChangedEvent("test", currentState, previousState)),
is(false)
);

// shutdown and node removed in the same event
previousState = fullyAllocated;
currentState = csBuilderWithNodes(clusterName, mlNode2, esNode1, esNode2, esNode3).metadata(
Metadata.builder()
.putCustom(TrainedModelAllocationMetadata.NAME, fullModelAllocation)
.putCustom(NodesShutdownMetadata.TYPE, nodesShutdownMetadata(mlNode1))
.build()
).build();

assertThat(
TrainedModelAllocationClusterService.shouldAllocateModels(new ClusterChangedEvent("test", currentState, previousState)),
is(true)
);

previousState = currentState;

// node comes back and the shutdown is removed
currentState = fullyAllocated;

assertThat(
TrainedModelAllocationClusterService.shouldAllocateModels(new ClusterChangedEvent("test", currentState, previousState)),
is(true)
);
}

private ClusterState.Builder csBuilderWithNodes(String name, DiscoveryNode... nodes) {
var csBuilder = ClusterState.builder(new ClusterName(name));
var nodeBuilder = DiscoveryNodes.builder();
for (var node : nodes) {
nodeBuilder.add(node);
}
csBuilder.nodes(nodeBuilder);
return csBuilder;
}

private NodesShutdownMetadata nodesShutdownMetadata(DiscoveryNode nodeToShutdown) {
return new NodesShutdownMetadata(
Map.of(
nodeToShutdown.getId(),
SingleNodeShutdownMetadata.builder()
.setNodeId(nodeToShutdown.getId())
.setStartedAtMillis(1L)
.setType(SingleNodeShutdownMetadata.Type.RESTART)
.setReason("because this cannot be null")
.build()
)
);
}

public void testSetAllocationToStopping() {
ClusterState clusterStateWithoutAllocation = ClusterState.builder(new ClusterName("testSetAllocationToStopping"))
.metadata(Metadata.builder().build())
Expand Down

0 comments on commit e255e40

Please sign in to comment.